You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/08 05:51:46 UTC

git commit: [S4-110] Adding support to skip network hop if sending locally

Updated Branches:
  refs/heads/S4-110-new 2b718c98b -> 75981b873


[S4-110] Adding support to skip network hop if sending locally


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/75981b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/75981b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/75981b87

Branch: refs/heads/S4-110-new
Commit: 75981b873c9f7d2900a264bcadeff550ac423dac
Parents: 2b718c9
Author: Kishore G <ki...@apache.org>
Authored: Thu Feb 7 21:51:33 2013 -0800
Committer: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Committed: Thu Feb 7 21:51:36 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    3 +-
 .../main/java/org/apache/s4/core/SenderImpl.java   |   86 ++++++++++----
 2 files changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/75981b87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index f0546a6..cca28b2 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -308,8 +308,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                     // cluster was changed
                 }
             } else {
-                //TODO:
-                //metrics.sentMessage(destination);
+                metrics.sentMessage(destination);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/75981b87/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 5e4da13..6923f98 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -27,6 +27,7 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterNode;
@@ -38,12 +39,14 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 
 /**
- * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
+ * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top
+ * level classes of the communication layer.
  * <p>
- * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
+ * {@link SenderImpl} is responsible for sending an event to a
+ * {@link ProcessingElement} instance using a hashKey.
  * <p>
- * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer.
+ * Details on how the cluster is partitioned and how events are serialized and
+ * transmitted to its destination are hidden from the application developer.
  */
 public class SenderImpl implements Sender {
 
@@ -54,7 +57,7 @@ public class SenderImpl implements Sender {
     final private Hasher hasher;
 
     Assignment assignment;
-    private int localPartitionId = -1;
+    private ClusterNode localNode;
 
     private final ExecutorService tpe;
 
@@ -73,8 +76,10 @@ public class SenderImpl implements Sender {
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
-            SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
+    public SenderImpl(Emitter emitter, SerializerDeserializer serDeser,
+            Hasher hasher, Assignment assignment,
+            SenderExecutorServiceFactory senderExecutorServiceFactory,
+            Cluster cluster) {
         this.emitter = emitter;
         this.serDeser = serDeser;
         this.hasher = hasher;
@@ -87,19 +92,23 @@ public class SenderImpl implements Sender {
     private void resolveLocalPartitionId() {
         ClusterNode node = assignment.assignClusterNode();
         if (node != null) {
-            localPartitionId = node.getPartition();
+            localNode = node;
         }
     }
 
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
+     * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String,
+     * org.apache.s4.base.Event)
      */
     @Override
     public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
-        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
-        if (partition == localPartitionId) {
+        int partition = (int) (hasher.hash(hashKey) % emitter
+                .getPartitionCount(event.getStreamName()));
+        Destination destination = cluster.getDestination(event.getStreamName(),
+                partition, emitter.getType());
+        if (isDestinationLocal(destination)) {
             metrics.sentLocal();
             /* Hey we are in the same JVM, don't use the network. */
             return false;
@@ -109,6 +118,20 @@ public class SenderImpl implements Sender {
         return true;
     }
 
+    private boolean isDestinationLocal(Destination destination) {
+         if (emitter.getType().equals("tcp")) {
+            TCPDestination tcpDestination = ((TCPDestination) destination);
+            if (localNode != null
+                    && localNode.getMachineName().equalsIgnoreCase(
+                            tcpDestination.getMachineName())
+                    && localNode.getPort() == tcpDestination.getPort()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     private void send(int partition, Event event) {
         tpe.submit(new SerializeAndSendToRemotePartitionTask(event, partition));
     }
@@ -116,7 +139,9 @@ public class SenderImpl implements Sender {
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event)
+     * @see
+     * org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event
+     * )
      */
     @Override
     public void sendToAllRemotePartitions(Event event) {
@@ -128,7 +153,8 @@ public class SenderImpl implements Sender {
         Event event;
         int remotePartitionId;
 
-        public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
+        public SerializeAndSendToRemotePartitionTask(Event event,
+                int remotePartitionId) {
             this.event = event;
             this.remotePartitionId = remotePartitionId;
         }
@@ -138,10 +164,14 @@ public class SenderImpl implements Sender {
             ByteBuffer serializedEvent = serDeser.serialize(event);
             try {
                 // TODO: where can we get the type ?
-                Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId, "tcp");
+                Destination destination = cluster.getDestination(
+                        event.getStreamName(), remotePartitionId,
+                        emitter.getType());
                 emitter.send(destination, serializedEvent);
             } catch (InterruptedException e) {
-                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+                logger.error(
+                        "Interrupted blocking send operation for event {}. Event is lost.",
+                        event);
                 Thread.currentThread().interrupt();
             }
 
@@ -161,23 +191,29 @@ public class SenderImpl implements Sender {
         @Override
         public void run() {
             ByteBuffer serializedEvent = serDeser.serialize(event);
-            Integer partitionCount = cluster.getPartitionCount(event.getStreamName());
+            Integer partitionCount = cluster.getPartitionCount(event
+                    .getStreamName());
             for (int i = 0; i < partitionCount; i++) {
 
                 /* Don't use the comm layer when we send to the same partition. */
-                if (localPartitionId != i) {
-                    try {
-                        // TODO: where to get the mode from
-                        Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
+                try {
+                    // TODO: where to get the mode from
+                    Destination destination = cluster.getDestination(
+                            event.getStreamName(), i, "tcp");
+                    if (!isDestinationLocal(destination)) {
                         emitter.send(destination, serializedEvent);
                         metrics.sentEvent(i);
-                    } catch (InterruptedException e) {
-                        logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
-                        // no reason to continue: we were interrupted, so we reset the interrupt status and leave
-                        Thread.currentThread().interrupt();
-                        break;
                     }
+                } catch (InterruptedException e) {
+                    logger.error(
+                            "Interrupted blocking send operation for event {}. Event is lost.",
+                            event);
+                    // no reason to continue: we were interrupted, so we reset
+                    // the interrupt status and leave
+                    Thread.currentThread().interrupt();
+                    break;
                 }
+
             }
 
         }