You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/10/26 10:40:35 UTC

git commit: Add getters for partition id and partition count in App class

Updated Branches:
  refs/heads/S4-102 [created] bbbaba443


Add getters for partition id and partition count in App class


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/bbbaba44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/bbbaba44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/bbbaba44

Branch: refs/heads/S4-102
Commit: bbbaba4432f0654f837eaae8be4472aa4bd69a61
Parents: a7f86ac
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Oct 26 12:30:59 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Oct 26 12:38:01 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DefaultCommModule.java |    3 +-
 .../src/main/java/org/apache/s4/core/App.java      |   35 ++++++++++++---
 .../java/org/apache/s4/core/DefaultCoreModule.java |    3 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |    2 +-
 .../src/main/java/org/apache/s4/core/Sender.java   |   19 ++++++--
 .../org/apache/s4/fixtures/MockCoreModule.java     |    5 ++
 .../org/apache/s4/wordcount/WordCounterPE.java     |   26 +++++++----
 7 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index d7c8cee..59409a7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -47,7 +47,8 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
 /**
- * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ * Default configuration module for the communication layer. Parameterizable / overridable through custom modules and /
+ * or configuration file (for string literals).
  * 
  */
 public class DefaultCommModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 936d225..673d0bb 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -28,6 +28,7 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
@@ -41,7 +42,7 @@ import com.google.inject.name.Named;
 
 /**
  * Container base class to hold all processing elements.
- *
+ * 
  * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
  */
 public abstract class App {
@@ -57,7 +58,7 @@ public abstract class App {
     final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
 
     /* Pes indexed by name. */
-    Map<String, ProcessingElement> peByName = Maps.newHashMap();
+    final Map<String, ProcessingElement> peByName = Maps.newHashMap();
 
     private ClockType clockType = ClockType.WALL_CLOCK;
     private int id = -1;
@@ -68,17 +69,20 @@ public abstract class App {
     private Receiver receiver;
 
     @Inject
-    RemoteSenders remoteSenders;
+    private Cluster cluster;
+
+    @Inject
+    private RemoteSenders remoteSenders;
 
     @Inject
-    Hasher hasher;
+    private Hasher hasher;
 
     @Inject
-    RemoteStreams remoteStreams;
+    private RemoteStreams remoteStreams;
 
     @Inject
     @Named("s4.cluster.name")
-    String clusterName;
+    private String clusterName;
 
     // default is NoOpCheckpointingFramework
     @Inject
@@ -256,6 +260,25 @@ public abstract class App {
     }
 
     /**
+     * 
+     * Returns the id of the partition assigned to the current node.
+     * 
+     * NOTE: This method will block until the current node gets assigned a partition
+     * 
+     */
+    public int getPartitionId() {
+        return getReceiver().getPartitionId();
+    }
+
+    /**
+     * 
+     * Returns the total number of partitions of the cluster this nodes belongs to.
+     */
+    public int getPartitionCount() {
+        return cluster.getPhysicalCluster().getPartitionCount();
+    }
+
+    /**
      * @return the sender object
      */
     public Sender getSender() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..0c42f48 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -41,8 +41,7 @@ import com.google.inject.Binder;
 import com.google.inject.name.Names;
 
 /**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
+ * Default module allowing assignment from ZK, communication through Netty, and distributed deployment management
  * 
  */
 public class DefaultCoreModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 6c0b19c..68840c0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -67,7 +67,7 @@ public class Receiver implements Runnable {
         streams = new MapMaker().makeMap();
     }
 
-    int getPartition() {
+    int getPartitionId() {
         return listener.getPartitionId();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 5b0b03d..b5dcf6e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -24,6 +24,8 @@ import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,16 +40,17 @@ import com.google.inject.Inject;
  * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
  * from the application developer.
  */
-public class Sender {
+public class Sender implements ClusterChangeListener {
 
     private static Logger logger = LoggerFactory.getLogger(Sender.class);
 
     final private Emitter emitter;
     final private SerializerDeserializer serDeser;
     final private Hasher hasher;
+    final private Cluster cluster;
 
-    Assignment assignment;
-    private int localPartitionId = -1;
+    final private Assignment assignment;
+    private volatile int localPartitionId = -1;
 
     /**
      * 
@@ -59,11 +62,14 @@ public class Sender {
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
+    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+            Cluster cluster) {
         this.emitter = emitter;
         this.serDeser = serDeser;
         this.hasher = hasher;
         this.assignment = assignment;
+        this.cluster = cluster;
+        this.cluster.addListener(this);
     }
 
     @Inject
@@ -121,4 +127,9 @@ public class Sender {
         }
     }
 
+    @Override
+    public void onChange() {
+        resolveLocalPartitionId();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 71bae7a..6a60ec5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,6 +20,8 @@ package org.apache.s4.fixtures;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.PhysicalCluster;
 import org.apache.s4.core.Receiver;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
@@ -47,5 +49,8 @@ public class MockCoreModule extends AbstractModule {
         bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
         bind(Listener.class).toInstance(Mockito.mock(Listener.class));
         bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+        Cluster clusterMock = Mockito.mock(Cluster.class);
+        Mockito.when(clusterMock.getPhysicalCluster()).thenReturn(new PhysicalCluster(1));
+        bind(Cluster.class).toInstance(clusterMock);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bbbaba44/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
index c9d8635..a124db3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
@@ -23,39 +23,45 @@ import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 
 public class WordCounterPE extends ProcessingElement {
-    
+
     int wordCounter;
     transient Stream<WordCountEvent> wordClassifierStream;
 
-    private WordCounterPE() {}
-    
+    private WordCounterPE() {
+    }
+
     public WordCounterPE(App app) {
         super(app);
     }
-    
+
     public void setWordClassifierStream(Stream<WordCountEvent> stream) {
         this.wordClassifierStream = stream;
     }
 
-    public void onEvent(WordSeenEvent event) { 
+    public void onEvent(WordSeenEvent event) {
+
         wordCounter++;
         System.out.println("seen word " + event.getWord());
-        // NOTE: it seems the id is the key for now...     
+        // NOTE: it seems the id is the key for now...
         wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+
+        // add some tests for partition count and id
+        if (!((getApp().getPartitionCount() == 1) && (getApp().getPartitionId() == 0))) {
+            throw new RuntimeException("Invalid partitioning");
+        }
+
     }
 
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub
-        
+
     }
 
     @Override
     protected void onRemove() {
         // TODO Auto-generated method stub
-        
-    }
 
-   
+    }
 
 }