You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/08 05:51:46 UTC
git commit: [S4-110] Adding support to skip network hop if sending
locally
Updated Branches:
refs/heads/S4-110-new 2b718c98b -> 75981b873
[S4-110] Adding support to skip network hop if sending locally
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/75981b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/75981b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/75981b87
Branch: refs/heads/S4-110-new
Commit: 75981b873c9f7d2900a264bcadeff550ac423dac
Parents: 2b718c9
Author: Kishore G <ki...@apache.org>
Authored: Thu Feb 7 21:51:33 2013 -0800
Committer: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Committed: Thu Feb 7 21:51:36 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 3 +-
.../main/java/org/apache/s4/core/SenderImpl.java | 86 ++++++++++----
2 files changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/75981b87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index f0546a6..cca28b2 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -308,8 +308,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
// cluster was changed
}
} else {
- //TODO:
- //metrics.sentMessage(destination);
+ metrics.sentMessage(destination);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/75981b87/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 5e4da13..6923f98 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -27,6 +27,7 @@ import org.apache.s4.base.Event;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterNode;
@@ -38,12 +39,14 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
/**
- * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
+ * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top
+ * level classes of the communication layer.
* <p>
- * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
+ * {@link SenderImpl} is responsible for sending an event to a
+ * {@link ProcessingElement} instance using a hashKey.
* <p>
- * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer.
+ * Details on how the cluster is partitioned and how events are serialized and
+ * transmitted to its destination are hidden from the application developer.
*/
public class SenderImpl implements Sender {
@@ -54,7 +57,7 @@ public class SenderImpl implements Sender {
final private Hasher hasher;
Assignment assignment;
- private int localPartitionId = -1;
+ private ClusterNode localNode;
private final ExecutorService tpe;
@@ -73,8 +76,10 @@ public class SenderImpl implements Sender {
* a hashing function to map keys to partition IDs.
*/
@Inject
- public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
- SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
+ public SenderImpl(Emitter emitter, SerializerDeserializer serDeser,
+ Hasher hasher, Assignment assignment,
+ SenderExecutorServiceFactory senderExecutorServiceFactory,
+ Cluster cluster) {
this.emitter = emitter;
this.serDeser = serDeser;
this.hasher = hasher;
@@ -87,19 +92,23 @@ public class SenderImpl implements Sender {
private void resolveLocalPartitionId() {
ClusterNode node = assignment.assignClusterNode();
if (node != null) {
- localPartitionId = node.getPartition();
+ localNode = node;
}
}
/*
* (non-Javadoc)
*
- * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
+ * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String,
+ * org.apache.s4.base.Event)
*/
@Override
public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
- if (partition == localPartitionId) {
+ int partition = (int) (hasher.hash(hashKey) % emitter
+ .getPartitionCount(event.getStreamName()));
+ Destination destination = cluster.getDestination(event.getStreamName(),
+ partition, emitter.getType());
+ if (isDestinationLocal(destination)) {
metrics.sentLocal();
/* Hey we are in the same JVM, don't use the network. */
return false;
@@ -109,6 +118,20 @@ public class SenderImpl implements Sender {
return true;
}
+ private boolean isDestinationLocal(Destination destination) {
+ if (emitter.getType().equals("tcp")) {
+ TCPDestination tcpDestination = ((TCPDestination) destination);
+ if (localNode != null
+ && localNode.getMachineName().equalsIgnoreCase(
+ tcpDestination.getMachineName())
+ && localNode.getPort() == tcpDestination.getPort()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private void send(int partition, Event event) {
tpe.submit(new SerializeAndSendToRemotePartitionTask(event, partition));
}
@@ -116,7 +139,9 @@ public class SenderImpl implements Sender {
/*
* (non-Javadoc)
*
- * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event)
+ * @see
+ * org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event
+ * )
*/
@Override
public void sendToAllRemotePartitions(Event event) {
@@ -128,7 +153,8 @@ public class SenderImpl implements Sender {
Event event;
int remotePartitionId;
- public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
+ public SerializeAndSendToRemotePartitionTask(Event event,
+ int remotePartitionId) {
this.event = event;
this.remotePartitionId = remotePartitionId;
}
@@ -138,10 +164,14 @@ public class SenderImpl implements Sender {
ByteBuffer serializedEvent = serDeser.serialize(event);
try {
// TODO: where can we get the type ?
- Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId, "tcp");
+ Destination destination = cluster.getDestination(
+ event.getStreamName(), remotePartitionId,
+ emitter.getType());
emitter.send(destination, serializedEvent);
} catch (InterruptedException e) {
- logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+ logger.error(
+ "Interrupted blocking send operation for event {}. Event is lost.",
+ event);
Thread.currentThread().interrupt();
}
@@ -161,23 +191,29 @@ public class SenderImpl implements Sender {
@Override
public void run() {
ByteBuffer serializedEvent = serDeser.serialize(event);
- Integer partitionCount = cluster.getPartitionCount(event.getStreamName());
+ Integer partitionCount = cluster.getPartitionCount(event
+ .getStreamName());
for (int i = 0; i < partitionCount; i++) {
/* Don't use the comm layer when we send to the same partition. */
- if (localPartitionId != i) {
- try {
- // TODO: where to get the mode from
- Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
+ try {
+ // TODO: where to get the mode from
+ Destination destination = cluster.getDestination(
+ event.getStreamName(), i, "tcp");
+ if (!isDestinationLocal(destination)) {
emitter.send(destination, serializedEvent);
metrics.sentEvent(i);
- } catch (InterruptedException e) {
- logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
- // no reason to continue: we were interrupted, so we reset the interrupt status and leave
- Thread.currentThread().interrupt();
- break;
}
+ } catch (InterruptedException e) {
+ logger.error(
+ "Interrupted blocking send operation for event {}. Event is lost.",
+ event);
+ // no reason to continue: we were interrupted, so we reset
+ // the interrupt status and leave
+ Thread.currentThread().interrupt();
+ break;
}
+
}
}