You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/02/07 20:35:15 UTC
[curator] branch CURATOR-505 updated: CURATOR-505 -
refactoring/refining a new listener container that doesn't rely on Guava
and supports mapping. We need for this PR anyway.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/CURATOR-505 by this push:
new c9e3cf6 CURATOR-505 - refactoring/refining a new listener container that doesn't rely on Guava and supports mapping. We need for this PR anyway.
c9e3cf6 is described below
commit c9e3cf653997d488cc57a7bc0f06b7962707cef1
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Feb 7 15:35:10 2019 -0500
CURATOR-505 - refactoring/refining a new listener container that doesn't rely on Guava and supports mapping. We need for this PR anyway.
---
.../curator/framework/CuratorFrameworkFactory.java | 5 +-
.../framework/listen/ListenerContainer.java | 4 ++
.../curator/framework/listen/ListenerManager.java | 44 ++++++++++++
...rContainer.java => MappingListenerManager.java} | 61 +++++++++-------
.../framework/listen/StandardListenerManager.java | 84 ++++++++++++++++++++++
.../curator/framework/state/CircuitBreaker.java | 18 +++++
.../CircuitBreakingConnectionStateListener.java | 18 +++++
.../state/ConnectionStateListenerDecorator.java | 18 +++++
.../framework/state/ConnectionStateManager.java | 7 +-
.../framework/state/TestCircuitBreaker.java | 18 +++++
...TestCircuitBreakingConnectionStateListener.java | 18 +++++
11 files changed, 263 insertions(+), 32 deletions(-)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 283a093..a5c08ff 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -498,9 +498,8 @@ public class CuratorFrameworkFactory
}
/**
- * Sets the connection state listener decorator. Curator recipes (and proper client code)
- * will always decorate connection state listeners via this decorator. For example,
- * you can set use {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
+ * Sets the connection state listener decorator. For example,
+ * you can set {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
* via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)}
* or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)}
*
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
index 60ae501..9139439 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
@@ -29,7 +29,11 @@ import java.util.concurrent.Executor;
/**
* Abstracts an object that has listeners
+ *
+ * @deprecated Prefer {@link MappingListenerManager} and
+ * {@link StandardListenerManager}
*/
+@Deprecated
public class ListenerContainer<T> implements Listenable<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
new file mode 100644
index 0000000..cab0426
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.listen;
+
+import java.util.function.Consumer;
+
+public interface ListenerManager<K, V> extends Listenable<K>
+{
+ /**
+ * Remove all listeners
+ */
+ void clear();
+
+ /**
+ * Return the number of listeners
+ *
+ * @return number
+ */
+ int size();
+
+ /**
+ * Utility - apply the given function to each listener. The function receives
+ * the listener as an argument.
+ *
+ * @param function function to call for each listener
+ */
+ void forEach(Consumer<V> function);
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
similarity index 52%
rename from curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java
rename to curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index 3a37ecb..f230da9 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -1,33 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.listen;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.UnaryOperator;
/**
* Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
* doesn't leak Guava's internals and also supports mapping/wrapping of listeners
*/
-public class MappingListenerContainer<K, V> implements Listenable<K>
+public class MappingListenerManager<K, V> implements ListenerManager<K, V>
{
private final Logger log = LoggerFactory.getLogger(getClass());
- private final Map<K, ListenerEntry<V>> listeners = Maps.newConcurrentMap();
+ private final Map<K, ListenerEntry<V>> listeners = new ConcurrentHashMap<>();
private final Function<K, V> mapper;
/**
- * Returns a new standard version that does no mapping
+ * Returns a new mapping container that maps to the same type
*
+ * @param mapper listener mapper/wrapper
* @return new container
*/
- public static <T> MappingListenerContainer<T, T> nonMapping()
+ public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
{
- return new MappingListenerContainer<>(Function.identity());
+ MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
+ return new StandardListenerManager<>(container);
}
/**
@@ -36,22 +56,22 @@ public class MappingListenerContainer<K, V> implements Listenable<K>
* @param mapper listener mapper/wrapper
* @return new container
*/
- public static <K, V> MappingListenerContainer<K, V> mapping(Function<K, V> mapper)
+ public static <K, V> ListenerManager<K, V> mapping(Function<K, V> mapper)
{
- return new MappingListenerContainer<>(mapper);
+ return new MappingListenerManager<>(mapper);
}
@Override
public void addListener(K listener)
{
- addListener(listener, MoreExecutors.directExecutor());
+ addListener(listener, Runnable::run);
}
@Override
public void addListener(K listener, Executor executor)
{
V mapped = mapper.apply(listener);
- listeners.put(listener, new ListenerEntry<V>(mapped, executor));
+ listeners.put(listener, new ListenerEntry<>(mapped, executor));
}
@Override
@@ -63,30 +83,19 @@ public class MappingListenerContainer<K, V> implements Listenable<K>
}
}
- /**
- * Remove all listeners
- */
+ @Override
public void clear()
{
listeners.clear();
}
- /**
- * Return the number of listeners
- *
- * @return number
- */
+ @Override
public int size()
{
return listeners.size();
}
- /**
- * Utility - apply the given function to each listener. The function receives
- * the listener as an argument.
- *
- * @param function function to call for each listener
- */
+ @Override
public void forEach(Consumer<V> function)
{
for ( ListenerEntry<V> entry : listeners.values() )
@@ -105,7 +114,7 @@ public class MappingListenerContainer<K, V> implements Listenable<K>
}
}
- private MappingListenerContainer(Function<K, V> mapper)
+ MappingListenerManager(Function<K, V> mapper)
{
this.mapper = mapper;
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
new file mode 100644
index 0000000..8b60ac1
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.listen;
+
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Non mapping version of a listener container
+ */
+public class StandardListenerManager<T> implements ListenerManager<T, T>
+{
+ private final ListenerManager<T, T> container;
+
+ /**
+ * Returns a new standard listener container
+ *
+ * @return new container
+ */
+ public static <T> StandardListenerManager<T> standard()
+ {
+ MappingListenerManager<T, T> container = new MappingListenerManager<>(Function.identity());
+ return new StandardListenerManager<>(container);
+ }
+
+ public StandardListenerManager(ListenerManager<T, T> container)
+ {
+ this.container = Objects.requireNonNull(container, "container cannot be null");
+ }
+
+ @Override
+ public void addListener(T listener)
+ {
+ container.addListener(listener);
+ }
+
+ @Override
+ public void addListener(T listener, Executor executor)
+ {
+ container.addListener(listener, executor);
+ }
+
+ @Override
+ public void removeListener(T listener)
+ {
+ container.removeListener(listener);
+ }
+
+ @Override
+ public void clear()
+ {
+ container.clear();
+ }
+
+ @Override
+ public int size()
+ {
+ return container.size();
+ }
+
+ @Override
+ public void forEach(Consumer<T> function)
+ {
+ container.forEach(function);
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
index 504edbc..78dc9af 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.state;
import org.apache.curator.RetryPolicy;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
index dba651a..24eba01 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.state;
import org.apache.curator.RetryPolicy;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
index 0ac808b..b95c4b3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.state;
import org.apache.curator.RetryPolicy;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 3654f61..583b9f2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -22,7 +22,8 @@ package org.apache.curator.framework.state;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.MappingListenerContainer;
+import org.apache.curator.framework.listen.MappingListenerManager;
+import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class ConnectionStateManager implements Closeable
private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
private final ExecutorService service;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final MappingListenerContainer<ConnectionStateListener, ConnectionStateListener> listeners;
+ private final StandardListenerManager<ConnectionStateListener> listeners;
// guarded by sync
private ConnectionState currentConnectionState;
@@ -113,7 +114,7 @@ public class ConnectionStateManager implements Closeable
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
service = Executors.newSingleThreadExecutor(threadFactory);
- listeners = MappingListenerContainer.mapping(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+ listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
}
/**
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
index e2daa96..37833b9 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.state;
import org.apache.curator.retry.RetryForever;
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
index 1712eed..5c80a9a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.state;
import org.apache.curator.RetryPolicy;