You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/07 03:04:35 UTC

git commit: Interface changes for emitter to accept Destination instead of partition. Fairly invasive. non-functional

Updated Branches:
  refs/heads/S4-110-new 24a11e18b -> a65684e9b


Interface changes for emitter to accept Destination instead of partition. Fairly invasive. non-functional


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/a65684e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/a65684e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/a65684e9

Branch: refs/heads/S4-110-new
Commit: a65684e9b54d5f2b8710957ee34abe27a5b37b89
Parents: 24a11e1
Author: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Authored: Wed Feb 6 19:04:19 2013 -0800
Committer: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Committed: Wed Feb 6 19:04:19 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/s4/base/Destination.java  |    5 +
 .../src/main/java/org/apache/s4/base/Emitter.java  |    8 +-
 .../org/apache/s4/comm/tcp/TCPDestination.java     |   13 ++
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   92 +++++++--------
 .../java/org/apache/s4/comm/topology/Cluster.java  |    5 +-
 .../apache/s4/comm/topology/ClusterFromHelix.java  |   70 ++++++++---
 .../org/apache/s4/comm/topology/ClusterFromZK.java |    3 +-
 .../apache/s4/comm/topology/PhysicalCluster.java   |   14 +-
 .../org/apache/s4/comm/udp/UDPDestination.java     |   14 +++
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   36 +++---
 .../org/apache/s4/comm/util/EmitterMetrics.java    |   31 ++++--
 .../org/apache/s4/core/DefaultRemoteSenders.java   |    8 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |   16 ++-
 .../main/java/org/apache/s4/core/SenderImpl.java   |   21 +++-
 .../java/org/apache/s4/core/util/S4Metrics.java    |    3 +-
 .../apache/s4/tools/helix/GenericEventAdapter.java |    4 +-
 16 files changed, 219 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java
new file mode 100644
index 0000000..7b099f6
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java
@@ -0,0 +1,5 @@
+package org.apache.s4.base;
+
+public interface Destination {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index e2ee412..fbc89b4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -37,11 +37,13 @@ public interface Emitter {
      * @throws InterruptedException
      *             if interrupted during blocking send operation
      */
-    boolean send(int partitionId, ByteBuffer message) throws InterruptedException;
+    boolean send(Destination destination , ByteBuffer message) throws InterruptedException;
 
-    int getPartitionCount();
+    //int getPartitionCount();
     
     int getPartitionCount(String stream);
-
+    
+    String getType();
+    
     void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
new file mode 100644
index 0000000..0ddffb8
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
@@ -0,0 +1,13 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.base.Destination;
+import org.apache.s4.comm.topology.ClusterNode;
+
+public class TCPDestination extends ClusterNode  implements Destination {
+
+    public TCPDestination(int partition, int port, String machineName,
+            String taskId) {
+        super(partition, port, machineName, taskId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index d31f04c..266d15f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.s4.base.Destination;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
@@ -84,14 +85,9 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private final ChannelGroup channels = new DefaultChannelGroup();
 
     /*
-     * Channel used to send messages to each partition
+     * Channel used to send messages to each destination
      */
-    private final BiMap<Integer, Channel> partitionChannelMap;
-
-    /*
-     * Node hosting each partition
-     */
-    private final BiMap<Integer, ClusterNode> partitionNodeMap;
+    private final BiMap<Destination, Channel> partitionChannelMap;
 
     // lock for synchronizing between cluster updates callbacks and other code
     private final Lock lock;
@@ -127,7 +123,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         // Initialize data structures
         int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
         partitionChannelMap = HashBiMap.create(clusterSize);
-        partitionNodeMap = HashBiMap.create(clusterSize);
 
         // Initialize netty related structures
         ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -159,63 +154,55 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     }
 
-    private boolean connectTo(Integer partitionId) throws InterruptedException {
-        ClusterNode clusterNode = partitionNodeMap.get(partitionId);
+    private boolean connectTo(TCPDestination destination) throws InterruptedException {
 
-        if (clusterNode == null) {
-
-            logger.error("No ClusterNode exists for partitionId " + partitionId);
-            refreshCluster();
+        if (destination == null) {
             return false;
         }
 
         try {
-            ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(),
-                    clusterNode.getPort()));
+            ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(destination.getMachineName(),
+                    destination.getPort()));
             connectFuture.await();
             if (connectFuture.isSuccess()) {
                 channels.add(connectFuture.getChannel());
-                partitionChannelMap.forcePut(partitionId, connectFuture.getChannel());
+                partitionChannelMap.forcePut(destination, connectFuture.getChannel());
                 return true;
             }
         } catch (InterruptedException ie) {
-            logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
-                    clusterNode.getPort()));
+            logger.error(String.format("Interrupted while connecting to %s:%d", destination.getMachineName(),
+                    destination.getPort()));
             throw ie;
         }
         return false;
     }
-
-    private void sendMessage(int partitionId, ByteBuffer message) throws InterruptedException {
+    
+    @Override
+    public boolean send(Destination destination, ByteBuffer message) throws InterruptedException {
         ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
 
-        if (!partitionChannelMap.containsKey(partitionId)) {
-            if (!connectTo(partitionId)) {
-                logger.warn("Could not connect to partition {}, discarding message", partitionId);
+        if (!partitionChannelMap.containsKey(destination)) {
+            if (!connectTo((TCPDestination) destination)) {
+                logger.warn("Could not connect to partition {}, discarding message", destination);
                 // Couldn't connect, discard message
-                return;
+                return false;
             }
         }
 
-        writePermits.get(partitionId).acquire();
+        writePermits.get(destination).acquire();
 
-        Channel c = partitionChannelMap.get(partitionId);
+        Channel c = partitionChannelMap.get(destination);
         if (c == null) {
-            logger.warn("Could not find channel for partition {}", partitionId);
-            return;
+            logger.warn("Could not find channel for destination {}", destination);
+            return false;
         }
 
-        c.write(buffer).addListener(new MessageSendingListener(partitionId));
-    }
-
-    @Override
-    public boolean send(int partitionId, ByteBuffer message) throws InterruptedException {
-        // TODO a possible optimization would be to buffer messages per partition, with a small timeout. This will limit
-        // the number of writes and therefore system calls.
-        sendMessage(partitionId, message);
+        c.write(buffer).addListener(new MessageSendingListener(destination));
         return true;
     }
 
+    
+
     protected void removeChannel(int partition) {
         Channel c = partitionChannelMap.remove(partition);
         if (c == null) {
@@ -252,6 +239,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private void refreshCluster() {
         lock.lock();
         try {
+            /*
             for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
                 Integer partition = clusterNode.getPartition();
                 if (partition == null) {
@@ -267,16 +255,17 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                 if (!writePermits.containsKey(partition)) {
                     writePermits.put(partition, new Semaphore(maxPendingWrites));
                 }
-            }
+                
+            }*/
         } finally {
             lock.unlock();
         }
     }
 
-    @Override
-    public int getPartitionCount() {
-        return topology.getPhysicalCluster().getPartitionCount();
-    }
+//    @Override
+//    public int getPartitionCount() {
+//        return topology.getPhysicalCluster().getPartitionCount();
+//    }
     
     @Override
     public int getPartitionCount(String streamName) {
@@ -301,30 +290,35 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     class MessageSendingListener implements ChannelFutureListener {
 
-        int partitionId = -1;
+        Destination destination = null;
 
-        public MessageSendingListener(int partitionId) {
+        public MessageSendingListener(Destination destination) {
             super();
-            this.partitionId = partitionId;
+            this.destination = destination;
         }
 
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
-            writePermits.get(partitionId).release();
+            writePermits.get(destination).release();
             if (!future.isSuccess()) {
                 try {
                     // TODO handle possible cluster reconfiguration between send and failure callback
                     logger.warn("Failed to send message to node {} (according to current cluster information)",
-                            topology.getPhysicalCluster().getNodes().get(partitionId));
+                            destination);
                 } catch (IndexOutOfBoundsException ignored) {
-                    logger.error("Failed to send message to partition {}", partitionId);
+                    logger.error("Failed to send message to partition {}", destination);
                     // cluster was changed
                 }
             } else {
-                metrics.sentMessage(partitionId);
+                metrics.sentMessage(destination);
 
             }
 
         }
     }
+
+    @Override
+    public String getType() {
+        return "tcp";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index e3c2eef..75f0d5b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -18,7 +18,8 @@
 
 package org.apache.s4.comm.topology;
 
-import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.base.Destination;
+
 
 /**
  * Represents a logical cluster
@@ -37,7 +38,7 @@ public interface Cluster {
      * @param partitionId
      * @return
      */
-    InstanceConfig getDestination(String streamName, int partitionId);
+    Destination getDestination(String streamName, int partitionId, String type);
 
     Integer getPartitionCount(String streamName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 7c49ac0..639c95a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -37,26 +37,31 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.base.Destination;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+import com.sun.org.apache.bcel.internal.generic.NEW;
 
 /**
- * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
- * configuration.
+ * Represents a logical cluster definition fetched from Zookeeper. Notifies
+ * listeners of runtime changes in the configuration.
  * 
  */
 public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
 
-    private static Logger logger = LoggerFactory.getLogger(ClusterFromHelix.class);
+    private static Logger logger = LoggerFactory
+            .getLogger(ClusterFromHelix.class);
 
     private final String clusterName;
     private final AtomicReference<PhysicalCluster> clusterRef;
     private final List<ClusterChangeListener> listeners;
     private final Lock lock;
     private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+    // Map of destination type to streamName to partitionId to Destination
+    private final AtomicReference<Map<String, Map<String, Map<String, Destination>>>> destinationInfoMapRef;
 
     /**
      * only the local topology
@@ -65,12 +70,17 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
     public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
             @Named("s4.cluster.zk_address") String zookeeperAddress,
             @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
+            throws Exception {
         this.clusterName = clusterName;
         Map<String, Integer> map = Collections.emptyMap();
         partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
         this.clusterRef = new AtomicReference<PhysicalCluster>();
         this.listeners = new ArrayList<ClusterChangeListener>();
+        Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+                .emptyMap();
+        destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+                destinationMap);
         lock = new ReentrantLock();
 
     }
@@ -78,36 +88,50 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
     /**
      * any topology
      */
-    public ClusterFromHelix(String clusterName, ZkClient zkClient, String machineId) {
+    public ClusterFromHelix(String clusterName, ZkClient zkClient,
+            String machineId) {
         this.clusterName = clusterName;
         Map<String, Integer> map = Collections.emptyMap();
         partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
         this.clusterRef = new AtomicReference<PhysicalCluster>();
         this.listeners = new ArrayList<ClusterChangeListener>();
+        Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+                .emptyMap();
+        destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+                destinationMap);
         lock = new ReentrantLock();
 
     }
 
     @Override
-    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+    public void onExternalViewChange(List<ExternalView> externalViewList,
+            NotificationContext changeContext) {
         lock.lock();
         try {
             logger.info("Start:Processing change in cluster topology");
             super.onExternalViewChange(externalViewList, changeContext);
             HelixManager manager = changeContext.getManager();
-            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            HelixDataAccessor helixDataAccessor = manager
+                    .getHelixDataAccessor();
             ConfigAccessor configAccessor = manager.getConfigAccessor();
             ConfigScopeBuilder builder = new ConfigScopeBuilder();
             Builder keyBuilder = helixDataAccessor.keyBuilder();
-            List<String> resources = helixDataAccessor.getChildNames(keyBuilder.idealStates());
+            List<String> resources = helixDataAccessor.getChildNames(keyBuilder
+                    .idealStates());
             Map<String, Integer> map = new HashMap<String, Integer>();
+            Map<String, Map<String, Map<String, Destination>>> destinationRoutingMap;
+            destinationRoutingMap = new HashMap<String, Map<String,Map<String,Destination>>>();
             for (String resource : resources) {
-                String resourceType = configAccessor.get(builder.forCluster(clusterName).forResource(resource).build(),
-                        "type");
+                String resourceType = configAccessor.get(
+                        builder.forCluster(clusterName).forResource(resource)
+                                .build(), "type");
                 if ("Task".equalsIgnoreCase(resourceType)) {
-                    String streamName = configAccessor.get(builder.forCluster(clusterName).forResource(resource)
-                            .build(), "streamName");
-                    IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+                    String streamName = configAccessor.get(
+                            builder.forCluster(clusterName)
+                                    .forResource(resource).build(),
+                            "streamName");
+                    IdealState idealstate = helixDataAccessor
+                            .getProperty(keyBuilder.idealStates(resource));
                     map.put(streamName, idealstate.getNumPartitions());
                 }
             }
@@ -146,7 +170,8 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+        result = prime * result
+                + ((clusterName == null) ? 0 : clusterName.hashCode());
         return result;
     }
 
@@ -168,13 +193,18 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
     }
 
     @Override
-    public InstanceConfig getDestination(String streamName, int partitionId) {
-        List<InstanceConfig> instances = getInstances(streamName, streamName + "_" + partitionId, "LEADER");
-        if (instances.size() == 1) {
-            return instances.get(0);
-        } else {
+    public Destination getDestination(String streamName, int partitionId,
+            String destinationType) {
+
+        Map<String, Map<String, Destination>> typeMap = destinationInfoMapRef.get().get(destinationType);
+        if (typeMap == null)
             return null;
-        }
+
+        Map<String, Destination> streamMap = typeMap.get(streamName);
+        if (streamMap == null)
+            return null;
+
+        return streamMap.get(partitionId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index f2b60d5..27bd354 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.base.Destination;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -210,7 +211,7 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
 
     }
     @Override
-    public InstanceConfig getDestination(String streamName, int partitionId) {
+    public Destination getDestination(String streamName, int partitionId, String type) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 233316d..1d13cf4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -90,13 +90,13 @@ public class PhysicalCluster {
         }
     }
 
-    /**
-     * 
-     * @return Number of partitions in the cluster.
-     */
-    public int getPartitionCount() {
-        return numPartitions;
-    }
+//    /**
+//     * 
+//     * @return Number of partitions in the cluster.
+//     */
+//    public int getPartitionCount() {
+//        return numPartitions;
+//    }
     
     /**
      * 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
new file mode 100644
index 0000000..b424f0c
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
@@ -0,0 +1,14 @@
+package org.apache.s4.comm.udp;
+
+import org.apache.s4.base.Destination;
+import org.apache.s4.comm.topology.ClusterNode;
+
+public class UDPDestination extends ClusterNode implements Destination{
+
+    public UDPDestination(int partition, int port, String machineName,
+            String taskId) {
+        super(partition, port, machineName, taskId);
+        // TODO Auto-generated constructor stub
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index b6450b5..51e3e0b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.s4.base.Destination;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
@@ -45,7 +46,7 @@ import com.google.inject.Inject;
 public class UDPEmitter implements Emitter, ClusterChangeListener {
     private DatagramSocket socket;
     private final HashBiMap<Integer, ClusterNode> nodes;
-    private final Map<Integer, InetAddress> inetCache = new HashMap<Integer, InetAddress>();
+    private final Map<Destination, InetAddress> inetCache = new HashMap<Destination, InetAddress>();
     private final long messageDropInQueueCount = 0;
     private final Cluster topology;
 
@@ -80,23 +81,17 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
     }
 
     @Override
-    public boolean send(int partitionId, ByteBuffer message) {
-        try {
-            ClusterNode node = nodes.get(partitionId);
-            if (node == null) {
-                LoggerFactory.getLogger(getClass()).error(
-                        "Cannot send message to partition {} because this partition is not visible to this emitter",
-                        partitionId);
-                return false;
-            }
+    public boolean send(Destination destination, ByteBuffer message) throws InterruptedException {
+        try{
+            UDPDestination udpDestination = (UDPDestination) destination;
             byte[] byteBuffer = new byte[message.array().length];
             System.arraycopy(message.array(), 0, byteBuffer, 0, message.array().length);
-            InetAddress inetAddress = inetCache.get(partitionId);
+            InetAddress inetAddress = inetCache.get(destination);
             if (inetAddress == null) {
-                inetAddress = InetAddress.getByName(node.getMachineName());
-                inetCache.put(partitionId, inetAddress);
+                inetAddress = InetAddress.getByName(udpDestination.getMachineName());
+                inetCache.put(destination, inetAddress);
             }
-            DatagramPacket dp = new DatagramPacket(byteBuffer, byteBuffer.length, inetAddress, node.getPort());
+            DatagramPacket dp = new DatagramPacket(byteBuffer, byteBuffer.length, inetAddress, udpDestination.getPort());
             socket.send(dp);
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -105,10 +100,10 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
         return true;
     }
 
-    @Override
-    public int getPartitionCount() {
-        return topology.getPhysicalCluster().getPartitionCount();
-    }
+//    @Override
+//    public int getPartitionCount() {
+//        return topology.getPhysicalCluster().getPartitionCount();
+//    }
     
     @Override
     public int getPartitionCount(String streamName) {
@@ -136,4 +131,9 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
         // TODO Auto-generated method stub
 
     }
+
+    @Override
+    public String getType() {
+        return "udp";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
index 2fe7e3e..41777cb 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
@@ -1,7 +1,10 @@
 package org.apache.s4.comm.util;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.s4.base.Destination;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.Cluster;
 
@@ -10,17 +13,29 @@ import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Meter;
 
 public class EmitterMetrics {
-    private final Meter[] emittersMeters;
+    private final Map<String, Map<String, Meter>> emittersMetersMap;
+    private Cluster cluster;
 
     public EmitterMetrics(Cluster cluster) {
-        emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
-        for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
-            emittersMeters[i] = Metrics.newMeter(TCPEmitter.class, "event-emitted@"
-                    + cluster.getPhysicalCluster().getName() + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
-        }
+        this.cluster = cluster;
+        emittersMetersMap = new HashMap<String, Map<String, Meter>>();
     }
 
-    public void sentMessage(int partitionId) {
-        emittersMeters[partitionId].mark();
+    public void sentMessage(Destination destination) {
+        //TODO
+        /*
+        Map<String, Meter> map = emittersMetersMap.get(stream);
+        if (map == null) {
+            map = new HashMap<String, Meter>();
+            emittersMetersMap.put(stream, map);
+        }
+        Meter meter = emittersMetersMap.get(stream).get(partitionId);
+        if (meter == null) {
+            meter = Metrics.newMeter(TCPEmitter.class, "event-emitted@"
+                    + cluster.getPhysicalCluster().getName() + "@stream-"+ stream + "@partition-"
+                    + partitionId, "event-emitted", TimeUnit.SECONDS);
+            emittersMetersMap.get(stream).put(partitionId, meter);
+        }
+        */
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index 6aaa8f1..2e14173 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -28,6 +28,7 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.comm.topology.StreamConsumer;
@@ -83,8 +84,9 @@ public class DefaultRemoteSenders implements RemoteSenders {
             // represented by a single stream consumer
             RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
             if (sender == null) {
-                RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
-                        .getClusterName())), hasher, consumer.getClusterName());
+                Cluster cluster = remoteClusters.getCluster(consumer
+                        .getClusterName());
+                RemoteSender newSender = new RemoteSender(cluster,remoteEmitters.getEmitter(cluster), hasher, consumer.getClusterName());
                 // TODO cleanup when remote topologies die
                 sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
                 if (sender == null) {
@@ -112,7 +114,7 @@ public class DefaultRemoteSenders implements RemoteSenders {
         @Override
         public void run() {
             try {
-                sender.send(hashKey, serDeser.serialize(event));
+                sender.send(event.getStreamName(),hashKey, serDeser.serialize(event));
             } catch (InterruptedException e) {
                 logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
                 Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index dba8b6c..ad9e02d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -21,8 +21,10 @@ package org.apache.s4.core;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.s4.base.Destination;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
+import org.apache.s4.comm.topology.Cluster;
 
 /**
  * Sends events to a remote cluster.
@@ -34,23 +36,27 @@ public class RemoteSender {
     final private Hasher hasher;
     AtomicInteger targetPartition = new AtomicInteger();
     final private String remoteClusterName;
+    private Cluster cluster;
 
-    public RemoteSender(Emitter emitter, Hasher hasher, String clusterName) {
+    public RemoteSender(Cluster cluster, Emitter emitter, Hasher hasher, String clusterName) {
         super();
+        this.cluster = cluster;
         this.emitter = emitter;
         this.hasher = hasher;
         this.remoteClusterName = clusterName;
 
     }
 
-    public void send(String hashKey, ByteBuffer message) throws InterruptedException {
+    public void send(String streamName,String hashKey, ByteBuffer message) throws InterruptedException {
         int partition;
         if (hashKey == null) {
             // round robin by default
-            partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount());
+            partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount(streamName));
         } else {
-            partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+            partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(streamName));
         }
-        emitter.send(partition, message);
+        //TODO: where do we get the mode
+        Destination destination = cluster.getDestination(streamName, partition, emitter.getType());
+        emitter.send(destination, message);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index b103ad8..5ade15d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -21,12 +21,14 @@ package org.apache.s4.core;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.s4.base.Destination;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.util.S4Metrics;
@@ -59,6 +61,8 @@ public class SenderImpl implements Sender {
     @Inject
     S4Metrics metrics;
 
+    private Cluster cluster;
+
     /**
      * 
      * @param emitter
@@ -70,11 +74,12 @@ public class SenderImpl implements Sender {
      */
     @Inject
     public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
-            SenderExecutorServiceFactory senderExecutorServiceFactory) {
+            SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
         this.emitter = emitter;
         this.serDeser = serDeser;
         this.hasher = hasher;
         this.assignment = assignment;
+        this.cluster = cluster;
         this.tpe = senderExecutorServiceFactory.create();
     }
 
@@ -93,7 +98,7 @@ public class SenderImpl implements Sender {
      */
     @Override
     public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
-        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
         if (partition == localPartitionId) {
             metrics.sentLocal();
             /* Hey we are in the same JVM, don't use the network. */
@@ -132,7 +137,9 @@ public class SenderImpl implements Sender {
         public void run() {
             ByteBuffer serializedEvent = serDeser.serialize(event);
             try {
-                emitter.send(remotePartitionId, serializedEvent);
+                //TODO: where can we get the type ?
+                Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId, "tcp");
+                emitter.send(destination, serializedEvent);
             } catch (InterruptedException e) {
                 logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
                 Thread.currentThread().interrupt();
@@ -154,13 +161,15 @@ public class SenderImpl implements Sender {
         @Override
         public void run() {
             ByteBuffer serializedEvent = serDeser.serialize(event);
-
-            for (int i = 0; i < emitter.getPartitionCount(); i++) {
+            Integer partitionCount = cluster.getPartitionCount(event.getStreamName());
+            for (int i = 0; i < partitionCount; i++) {
 
                 /* Don't use the comm layer when we send to the same partition. */
                 if (localPartitionId != i) {
                     try {
-                        emitter.send(i, serializedEvent);
+                        //TODO: where to get the mode from
+                        Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
+                        emitter.send(destination, serializedEvent);
                         metrics.sentEvent(i);
                     } catch (InterruptedException e) {
                         logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 0b81bbe..c946385 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -59,7 +59,8 @@ public class S4Metrics {
 
     @Inject
     private void init() {
-        senderMeters = new Meter[emitter.getPartitionCount()];
+        //TODO: FIX METER
+        senderMeters = new Meter[100];
         // int localPartitionId = assignment.assignClusterNode().getPartition();
         for (int i = 0; i < senderMeters.length; i++) {
             senderMeters[i] = Metrics.newMeter(SenderImpl.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index fddbd03..e640066 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -9,6 +9,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.IdealState;
+import org.apache.s4.base.Destination;
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.TCPEmitter;
@@ -46,7 +47,8 @@ public class GenericEventAdapter {
                 KryoSerDeser serializer = new KryoSerDeser(classLoader);
 //                EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
                 System.out.println("Sending event to partition:" + partitionId);
-                emitter.send(partitionId, serializer.serialize(event));
+                Destination destination = cluster.getDestination(adapterArgs.streamName, partitionId, emitter.getType());
+                emitter.send(destination, serializer.serialize(event));
                 Thread.sleep(1000);
             }
         } catch (Exception e) {