You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/26 19:03:25 UTC

git commit: S4-120 ensure application-singleton scopes for more dependencies

Updated Branches:
  refs/heads/S4-120 [created] 81f3ebad2


S4-120 ensure application-singleton scopes for more dependencies

- use @Singleton annotations where applicable, in(Scopes.SINGLETON) otherwise (providers)


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/81f3ebad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/81f3ebad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/81f3ebad

Branch: refs/heads/S4-120
Commit: 81f3ebad28ce958b9095190c7d5245e3d31c24c0
Parents: 755ed6b
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Feb 26 16:48:25 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Feb 26 20:00:59 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DefaultCommModule.java |   11 ++++++-----
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    3 ++-
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |    2 ++
 .../org/apache/s4/comm/topology/ClusterFromZK.java |    2 ++
 .../apache/s4/comm/topology/ClustersFromZK.java    |    3 ++-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |    2 ++
 .../java/org/apache/s4/comm/udp/UDPListener.java   |   14 +++++++++-----
 .../java/org/apache/s4/core/DefaultCoreModule.java |    2 +-
 8 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index c557413..0c3f944 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -90,25 +90,26 @@ public class DefaultCommModule extends AbstractModule {
         install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
                 SerializerDeserializerFactory.class));
 
-        bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
+        bind(Cluster.class).to(ClusterFromZK.class);
 
-        bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
+        bind(Clusters.class).to(ClustersFromZK.class);
 
-        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class);
 
         bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
 
         try {
             Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
                     .getString("s4.comm.emitter.class"));
-            bind(Emitter.class).to(emitterClass);
+
+            bind(Emitter.class).to(emitterClass).in(Scopes.SINGLETON);
 
             // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
             Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
                     .getString("s4.comm.emitter.remote.class"));
             install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
                     RemoteEmitterFactory.class));
-            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class);
 
         } catch (ClassNotFoundException e) {
             logger.error("Cannot find class implementation ", e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index fd0ad2b..2a44aac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -59,6 +59,7 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
 /**
@@ -69,7 +70,7 @@ import com.google.inject.name.Named;
  * A throttling mechanism is also provided, so that back pressure can be applied if consumers are too slow.
  * 
  */
-
+@Singleton
 public class TCPEmitter implements Emitter, ClusterChangeListener {
     private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index c7673ae..48cac40 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -49,12 +49,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
 /**
  * Receives messages through TCP for the assigned subcluster.
  * 
  */
+@Singleton
 public class TCPListener implements Listener {
     private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
     private ClusterNode node;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 45e0ae4..9ebca3d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
 /**
@@ -42,6 +43,7 @@ import com.google.inject.name.Named;
  * configuration.
  * 
  */
+@Singleton
 public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
 
     private static Logger logger = LoggerFactory.getLogger(ClusterFromZK.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index b9fde5e..65950c1 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -28,18 +28,19 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
 /**
  * Monitors all clusters
  * 
  */
+@Singleton
 public class ClustersFromZK implements Clusters, IZkStateListener {
     private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
     private KeeperState state;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index d335970..e3f8a80 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -37,11 +37,13 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 /**
  * UDP based emitter.
  * 
  */
+@Singleton
 public class UDPEmitter implements Emitter, ClusterChangeListener {
     private DatagramSocket socket;
     private final HashBiMap<Integer, ClusterNode> nodes;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index e719cbc..9ea0e89 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -33,21 +33,23 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 /**
  * 
  * Implementation of a simple UDP listener.
  * 
  */
+@Singleton
 public class UDPListener implements Listener, Runnable {
 
     private DatagramSocket socket;
-    private DatagramPacket datagram;
-    private byte[] bs;
+    private final DatagramPacket datagram;
+    private final byte[] bs;
     static int BUFFER_LENGTH = 65507;
-    private BlockingQueue<ByteBuffer> handoffQueue = new SynchronousQueue<ByteBuffer>();
-    private ClusterNode node;
-    private Receiver receiver;
+    private final BlockingQueue<ByteBuffer> handoffQueue = new SynchronousQueue<ByteBuffer>();
+    private final ClusterNode node;
+    private final Receiver receiver;
 
     @Inject
     public UDPListener(Assignment assignment, final Receiver receiver) {
@@ -76,6 +78,7 @@ public class UDPListener implements Listener, Runnable {
         (new Thread(this)).start();
     }
 
+    @Override
     public void run() {
         try {
             while (!Thread.interrupted()) {
@@ -104,6 +107,7 @@ public class UDPListener implements Listener, Runnable {
         }
     }
 
+    @Override
     public int getPartitionId() {
         return node.getPartition();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/81f3ebad/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index c4cafee..1c5ae9c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -90,7 +90,7 @@ public class DefaultCoreModule extends AbstractModule {
 
         // For enabling checkpointing, one needs to use a custom module, such as
         // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
-        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class).in(Scopes.SINGLETON);
 
         // shed load in local sender only by default
         bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);