You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/26 19:03:25 UTC
git commit: S4-120 ensure application-singleton scopes for more
dependencies
Updated Branches:
refs/heads/S4-120 [created] 81f3ebad2
S4-120 ensure application-singleton scopes for more dependencies
- use @Singleton annotations where applicable, in(Scopes.SINGLETON) otherwise (providers)
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/81f3ebad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/81f3ebad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/81f3ebad
Branch: refs/heads/S4-120
Commit: 81f3ebad28ce958b9095190c7d5245e3d31c24c0
Parents: 755ed6b
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Feb 26 16:48:25 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Feb 26 20:00:59 2013 +0100
----------------------------------------------------------------------
.../java/org/apache/s4/comm/DefaultCommModule.java | 11 ++++++-----
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 3 ++-
.../java/org/apache/s4/comm/tcp/TCPListener.java | 2 ++
.../org/apache/s4/comm/topology/ClusterFromZK.java | 2 ++
.../apache/s4/comm/topology/ClustersFromZK.java | 3 ++-
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 2 ++
.../java/org/apache/s4/comm/udp/UDPListener.java | 14 +++++++++-----
.../java/org/apache/s4/core/DefaultCoreModule.java | 2 +-
8 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index c557413..0c3f944 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -90,25 +90,26 @@ public class DefaultCommModule extends AbstractModule {
install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
SerializerDeserializerFactory.class));
- bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
+ bind(Cluster.class).to(ClusterFromZK.class);
- bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
+ bind(Clusters.class).to(ClustersFromZK.class);
- bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+ bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class);
bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
try {
Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
.getString("s4.comm.emitter.class"));
- bind(Emitter.class).to(emitterClass);
+
+ bind(Emitter.class).to(emitterClass).in(Scopes.SINGLETON);
// RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
.getString("s4.comm.emitter.remote.class"));
install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
RemoteEmitterFactory.class));
- bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+ bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class);
} catch (ClassNotFoundException e) {
logger.error("Cannot find class implementation ", e);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index fd0ad2b..2a44aac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -59,6 +59,7 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
/**
@@ -69,7 +70,7 @@ import com.google.inject.name.Named;
* A throttling mechanism is also provided, so that back pressure can be applied if consumers are too slow.
*
*/
-
+@Singleton
public class TCPEmitter implements Emitter, ClusterChangeListener {
private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index c7673ae..48cac40 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -49,12 +49,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
/**
* Receives messages through TCP for the assigned subcluster.
*
*/
+@Singleton
public class TCPListener implements Listener {
private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
private ClusterNode node;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 45e0ae4..9ebca3d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
/**
@@ -42,6 +43,7 @@ import com.google.inject.name.Named;
* configuration.
*
*/
+@Singleton
public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
private static Logger logger = LoggerFactory.getLogger(ClusterFromZK.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index b9fde5e..65950c1 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -28,18 +28,19 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
/**
* Monitors all clusters
*
*/
+@Singleton
public class ClustersFromZK implements Clusters, IZkStateListener {
private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
private KeeperState state;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index d335970..e3f8a80 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -37,11 +37,13 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
/**
* UDP based emitter.
*
*/
+@Singleton
public class UDPEmitter implements Emitter, ClusterChangeListener {
private DatagramSocket socket;
private final HashBiMap<Integer, ClusterNode> nodes;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index e719cbc..9ea0e89 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -33,21 +33,23 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
/**
*
* Implementation of a simple UDP listener.
*
*/
+@Singleton
public class UDPListener implements Listener, Runnable {
private DatagramSocket socket;
- private DatagramPacket datagram;
- private byte[] bs;
+ private final DatagramPacket datagram;
+ private final byte[] bs;
static int BUFFER_LENGTH = 65507;
- private BlockingQueue<ByteBuffer> handoffQueue = new SynchronousQueue<ByteBuffer>();
- private ClusterNode node;
- private Receiver receiver;
+ private final BlockingQueue<ByteBuffer> handoffQueue = new SynchronousQueue<ByteBuffer>();
+ private final ClusterNode node;
+ private final Receiver receiver;
@Inject
public UDPListener(Assignment assignment, final Receiver receiver) {
@@ -76,6 +78,7 @@ public class UDPListener implements Listener, Runnable {
(new Thread(this)).start();
}
+ @Override
public void run() {
try {
while (!Thread.interrupted()) {
@@ -104,6 +107,7 @@ public class UDPListener implements Listener, Runnable {
}
}
+ @Override
public int getPartitionId() {
return node.getPartition();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index c4cafee..1c5ae9c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -90,7 +90,7 @@ public class DefaultCoreModule extends AbstractModule {
// For enabling checkpointing, one needs to use a custom module, such as
// org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
- bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+ bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class).in(Scopes.SINGLETON);
// shed load in local sender only by default
bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);