You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/07 03:04:35 UTC
git commit: Interface changes for emitter to accept Destination
instead of partition. Fairly invasive. non-functional
Updated Branches:
refs/heads/S4-110-new 24a11e18b -> a65684e9b
Interface changes for emitter to accept Destination instead of partition. Fairly invasive. non-functional
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/a65684e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/a65684e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/a65684e9
Branch: refs/heads/S4-110-new
Commit: a65684e9b54d5f2b8710957ee34abe27a5b37b89
Parents: 24a11e1
Author: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Authored: Wed Feb 6 19:04:19 2013 -0800
Committer: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Committed: Wed Feb 6 19:04:19 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/s4/base/Destination.java | 5 +
.../src/main/java/org/apache/s4/base/Emitter.java | 8 +-
.../org/apache/s4/comm/tcp/TCPDestination.java | 13 ++
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 92 +++++++--------
.../java/org/apache/s4/comm/topology/Cluster.java | 5 +-
.../apache/s4/comm/topology/ClusterFromHelix.java | 70 ++++++++---
.../org/apache/s4/comm/topology/ClusterFromZK.java | 3 +-
.../apache/s4/comm/topology/PhysicalCluster.java | 14 +-
.../org/apache/s4/comm/udp/UDPDestination.java | 14 +++
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 36 +++---
.../org/apache/s4/comm/util/EmitterMetrics.java | 31 ++++--
.../org/apache/s4/core/DefaultRemoteSenders.java | 8 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 16 ++-
.../main/java/org/apache/s4/core/SenderImpl.java | 21 +++-
.../java/org/apache/s4/core/util/S4Metrics.java | 3 +-
.../apache/s4/tools/helix/GenericEventAdapter.java | 4 +-
16 files changed, 219 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java
new file mode 100644
index 0000000..7b099f6
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Destination.java
@@ -0,0 +1,5 @@
+package org.apache.s4.base;
+
+public interface Destination {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index e2ee412..fbc89b4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -37,11 +37,13 @@ public interface Emitter {
* @throws InterruptedException
* if interrupted during blocking send operation
*/
- boolean send(int partitionId, ByteBuffer message) throws InterruptedException;
+ boolean send(Destination destination , ByteBuffer message) throws InterruptedException;
- int getPartitionCount();
+ //int getPartitionCount();
int getPartitionCount(String stream);
-
+
+ String getType();
+
void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
new file mode 100644
index 0000000..0ddffb8
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
@@ -0,0 +1,13 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.base.Destination;
+import org.apache.s4.comm.topology.ClusterNode;
+
+public class TCPDestination extends ClusterNode implements Destination {
+
+ public TCPDestination(int partition, int port, String machineName,
+ String taskId) {
+ super(partition, port, machineName, taskId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index d31f04c..266d15f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.s4.base.Destination;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
@@ -84,14 +85,9 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
private final ChannelGroup channels = new DefaultChannelGroup();
/*
- * Channel used to send messages to each partition
+ * Channel used to send messages to each destination
*/
- private final BiMap<Integer, Channel> partitionChannelMap;
-
- /*
- * Node hosting each partition
- */
- private final BiMap<Integer, ClusterNode> partitionNodeMap;
+ private final BiMap<Destination, Channel> partitionChannelMap;
// lock for synchronizing between cluster updates callbacks and other code
private final Lock lock;
@@ -127,7 +123,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
// Initialize data structures
int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
partitionChannelMap = HashBiMap.create(clusterSize);
- partitionNodeMap = HashBiMap.create(clusterSize);
// Initialize netty related structures
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -159,63 +154,55 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
- private boolean connectTo(Integer partitionId) throws InterruptedException {
- ClusterNode clusterNode = partitionNodeMap.get(partitionId);
+ private boolean connectTo(TCPDestination destination) throws InterruptedException {
- if (clusterNode == null) {
-
- logger.error("No ClusterNode exists for partitionId " + partitionId);
- refreshCluster();
+ if (destination == null) {
return false;
}
try {
- ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(),
- clusterNode.getPort()));
+ ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(destination.getMachineName(),
+ destination.getPort()));
connectFuture.await();
if (connectFuture.isSuccess()) {
channels.add(connectFuture.getChannel());
- partitionChannelMap.forcePut(partitionId, connectFuture.getChannel());
+ partitionChannelMap.forcePut(destination, connectFuture.getChannel());
return true;
}
} catch (InterruptedException ie) {
- logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
- clusterNode.getPort()));
+ logger.error(String.format("Interrupted while connecting to %s:%d", destination.getMachineName(),
+ destination.getPort()));
throw ie;
}
return false;
}
-
- private void sendMessage(int partitionId, ByteBuffer message) throws InterruptedException {
+
+ @Override
+ public boolean send(Destination destination, ByteBuffer message) throws InterruptedException {
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
- if (!partitionChannelMap.containsKey(partitionId)) {
- if (!connectTo(partitionId)) {
- logger.warn("Could not connect to partition {}, discarding message", partitionId);
+ if (!partitionChannelMap.containsKey(destination)) {
+ if (!connectTo((TCPDestination) destination)) {
+ logger.warn("Could not connect to partition {}, discarding message", destination);
// Couldn't connect, discard message
- return;
+ return false;
}
}
- writePermits.get(partitionId).acquire();
+ writePermits.get(destination).acquire();
- Channel c = partitionChannelMap.get(partitionId);
+ Channel c = partitionChannelMap.get(destination);
if (c == null) {
- logger.warn("Could not find channel for partition {}", partitionId);
- return;
+ logger.warn("Could not find channel for destination {}", destination);
+ return false;
}
- c.write(buffer).addListener(new MessageSendingListener(partitionId));
- }
-
- @Override
- public boolean send(int partitionId, ByteBuffer message) throws InterruptedException {
- // TODO a possible optimization would be to buffer messages per partition, with a small timeout. This will limit
- // the number of writes and therefore system calls.
- sendMessage(partitionId, message);
+ c.write(buffer).addListener(new MessageSendingListener(destination));
return true;
}
+
+
protected void removeChannel(int partition) {
Channel c = partitionChannelMap.remove(partition);
if (c == null) {
@@ -252,6 +239,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
private void refreshCluster() {
lock.lock();
try {
+ /*
for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
if (partition == null) {
@@ -267,16 +255,17 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
if (!writePermits.containsKey(partition)) {
writePermits.put(partition, new Semaphore(maxPendingWrites));
}
- }
+
+ }*/
} finally {
lock.unlock();
}
}
- @Override
- public int getPartitionCount() {
- return topology.getPhysicalCluster().getPartitionCount();
- }
+// @Override
+// public int getPartitionCount() {
+// return topology.getPhysicalCluster().getPartitionCount();
+// }
@Override
public int getPartitionCount(String streamName) {
@@ -301,30 +290,35 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
class MessageSendingListener implements ChannelFutureListener {
- int partitionId = -1;
+ Destination destination = null;
- public MessageSendingListener(int partitionId) {
+ public MessageSendingListener(Destination destination) {
super();
- this.partitionId = partitionId;
+ this.destination = destination;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- writePermits.get(partitionId).release();
+ writePermits.get(destination).release();
if (!future.isSuccess()) {
try {
// TODO handle possible cluster reconfiguration between send and failure callback
logger.warn("Failed to send message to node {} (according to current cluster information)",
- topology.getPhysicalCluster().getNodes().get(partitionId));
+ destination);
} catch (IndexOutOfBoundsException ignored) {
- logger.error("Failed to send message to partition {}", partitionId);
+ logger.error("Failed to send message to partition {}", destination);
// cluster was changed
}
} else {
- metrics.sentMessage(partitionId);
+ metrics.sentMessage(destination);
}
}
}
+
+ @Override
+ public String getType() {
+ return "tcp";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index e3c2eef..75f0d5b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -18,7 +18,8 @@
package org.apache.s4.comm.topology;
-import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.base.Destination;
+
/**
* Represents a logical cluster
@@ -37,7 +38,7 @@ public interface Cluster {
* @param partitionId
* @return
*/
- InstanceConfig getDestination(String streamName, int partitionId);
+ Destination getDestination(String streamName, int partitionId, String type);
Integer getPartitionCount(String streamName);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 7c49ac0..639c95a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -37,26 +37,31 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.base.Destination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.name.Named;
+import com.sun.org.apache.bcel.internal.generic.NEW;
/**
- * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
- * configuration.
+ * Represents a logical cluster definition fetched from Zookeeper. Notifies
+ * listeners of runtime changes in the configuration.
*
*/
public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
- private static Logger logger = LoggerFactory.getLogger(ClusterFromHelix.class);
+ private static Logger logger = LoggerFactory
+ .getLogger(ClusterFromHelix.class);
private final String clusterName;
private final AtomicReference<PhysicalCluster> clusterRef;
private final List<ClusterChangeListener> listeners;
private final Lock lock;
private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+ // Map of destination type to streamName to partitionId to Destination
+ private final AtomicReference<Map<String, Map<String, Map<String, Destination>>>> destinationInfoMapRef;
/**
* only the local topology
@@ -65,12 +70,17 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
@Named("s4.cluster.zk_address") String zookeeperAddress,
@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
+ throws Exception {
this.clusterName = clusterName;
Map<String, Integer> map = Collections.emptyMap();
partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
this.clusterRef = new AtomicReference<PhysicalCluster>();
this.listeners = new ArrayList<ClusterChangeListener>();
+ Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+ .emptyMap();
+ destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+ destinationMap);
lock = new ReentrantLock();
}
@@ -78,36 +88,50 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
/**
* any topology
*/
- public ClusterFromHelix(String clusterName, ZkClient zkClient, String machineId) {
+ public ClusterFromHelix(String clusterName, ZkClient zkClient,
+ String machineId) {
this.clusterName = clusterName;
Map<String, Integer> map = Collections.emptyMap();
partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
this.clusterRef = new AtomicReference<PhysicalCluster>();
this.listeners = new ArrayList<ClusterChangeListener>();
+ Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+ .emptyMap();
+ destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+ destinationMap);
lock = new ReentrantLock();
}
@Override
- public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext) {
lock.lock();
try {
logger.info("Start:Processing change in cluster topology");
super.onExternalViewChange(externalViewList, changeContext);
HelixManager manager = changeContext.getManager();
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ HelixDataAccessor helixDataAccessor = manager
+ .getHelixDataAccessor();
ConfigAccessor configAccessor = manager.getConfigAccessor();
ConfigScopeBuilder builder = new ConfigScopeBuilder();
Builder keyBuilder = helixDataAccessor.keyBuilder();
- List<String> resources = helixDataAccessor.getChildNames(keyBuilder.idealStates());
+ List<String> resources = helixDataAccessor.getChildNames(keyBuilder
+ .idealStates());
Map<String, Integer> map = new HashMap<String, Integer>();
+ Map<String, Map<String, Map<String, Destination>>> destinationRoutingMap;
+ destinationRoutingMap = new HashMap<String, Map<String,Map<String,Destination>>>();
for (String resource : resources) {
- String resourceType = configAccessor.get(builder.forCluster(clusterName).forResource(resource).build(),
- "type");
+ String resourceType = configAccessor.get(
+ builder.forCluster(clusterName).forResource(resource)
+ .build(), "type");
if ("Task".equalsIgnoreCase(resourceType)) {
- String streamName = configAccessor.get(builder.forCluster(clusterName).forResource(resource)
- .build(), "streamName");
- IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+ String streamName = configAccessor.get(
+ builder.forCluster(clusterName)
+ .forResource(resource).build(),
+ "streamName");
+ IdealState idealstate = helixDataAccessor
+ .getProperty(keyBuilder.idealStates(resource));
map.put(streamName, idealstate.getNumPartitions());
}
}
@@ -146,7 +170,8 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+ result = prime * result
+ + ((clusterName == null) ? 0 : clusterName.hashCode());
return result;
}
@@ -168,13 +193,18 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
}
@Override
- public InstanceConfig getDestination(String streamName, int partitionId) {
- List<InstanceConfig> instances = getInstances(streamName, streamName + "_" + partitionId, "LEADER");
- if (instances.size() == 1) {
- return instances.get(0);
- } else {
+ public Destination getDestination(String streamName, int partitionId,
+ String destinationType) {
+
+ Map<String, Map<String, Destination>> typeMap = destinationInfoMapRef.get().get(destinationType);
+ if (typeMap == null)
return null;
- }
+
+ Map<String, Destination> streamMap = typeMap.get(streamName);
+ if (streamMap == null)
+ return null;
+
+ return streamMap.get(partitionId);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index f2b60d5..27bd354 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.base.Destination;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +211,7 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
}
@Override
- public InstanceConfig getDestination(String streamName, int partitionId) {
+ public Destination getDestination(String streamName, int partitionId, String type) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 233316d..1d13cf4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -90,13 +90,13 @@ public class PhysicalCluster {
}
}
- /**
- *
- * @return Number of partitions in the cluster.
- */
- public int getPartitionCount() {
- return numPartitions;
- }
+// /**
+// *
+// * @return Number of partitions in the cluster.
+// */
+// public int getPartitionCount() {
+// return numPartitions;
+// }
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
new file mode 100644
index 0000000..b424f0c
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
@@ -0,0 +1,14 @@
+package org.apache.s4.comm.udp;
+
+import org.apache.s4.base.Destination;
+import org.apache.s4.comm.topology.ClusterNode;
+
+public class UDPDestination extends ClusterNode implements Destination{
+
+ public UDPDestination(int partition, int port, String machineName,
+ String taskId) {
+ super(partition, port, machineName, taskId);
+ // TODO Auto-generated constructor stub
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index b6450b5..51e3e0b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.s4.base.Destination;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
@@ -45,7 +46,7 @@ import com.google.inject.Inject;
public class UDPEmitter implements Emitter, ClusterChangeListener {
private DatagramSocket socket;
private final HashBiMap<Integer, ClusterNode> nodes;
- private final Map<Integer, InetAddress> inetCache = new HashMap<Integer, InetAddress>();
+ private final Map<Destination, InetAddress> inetCache = new HashMap<Destination, InetAddress>();
private final long messageDropInQueueCount = 0;
private final Cluster topology;
@@ -80,23 +81,17 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
}
@Override
- public boolean send(int partitionId, ByteBuffer message) {
- try {
- ClusterNode node = nodes.get(partitionId);
- if (node == null) {
- LoggerFactory.getLogger(getClass()).error(
- "Cannot send message to partition {} because this partition is not visible to this emitter",
- partitionId);
- return false;
- }
+ public boolean send(Destination destination, ByteBuffer message) throws InterruptedException {
+ try{
+ UDPDestination udpDestination = (UDPDestination) destination;
byte[] byteBuffer = new byte[message.array().length];
System.arraycopy(message.array(), 0, byteBuffer, 0, message.array().length);
- InetAddress inetAddress = inetCache.get(partitionId);
+ InetAddress inetAddress = inetCache.get(destination);
if (inetAddress == null) {
- inetAddress = InetAddress.getByName(node.getMachineName());
- inetCache.put(partitionId, inetAddress);
+ inetAddress = InetAddress.getByName(udpDestination.getMachineName());
+ inetCache.put(destination, inetAddress);
}
- DatagramPacket dp = new DatagramPacket(byteBuffer, byteBuffer.length, inetAddress, node.getPort());
+ DatagramPacket dp = new DatagramPacket(byteBuffer, byteBuffer.length, inetAddress, udpDestination.getPort());
socket.send(dp);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -105,10 +100,10 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
return true;
}
- @Override
- public int getPartitionCount() {
- return topology.getPhysicalCluster().getPartitionCount();
- }
+// @Override
+// public int getPartitionCount() {
+// return topology.getPhysicalCluster().getPartitionCount();
+// }
@Override
public int getPartitionCount(String streamName) {
@@ -136,4 +131,9 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
// TODO Auto-generated method stub
}
+
+ @Override
+ public String getType() {
+ return "udp";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
index 2fe7e3e..41777cb 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
@@ -1,7 +1,10 @@
package org.apache.s4.comm.util;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.s4.base.Destination;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.Cluster;
@@ -10,17 +13,29 @@ import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
public class EmitterMetrics {
- private final Meter[] emittersMeters;
+ private final Map<String, Map<String, Meter>> emittersMetersMap;
+ private Cluster cluster;
public EmitterMetrics(Cluster cluster) {
- emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
- for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
- emittersMeters[i] = Metrics.newMeter(TCPEmitter.class, "event-emitted@"
- + cluster.getPhysicalCluster().getName() + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
- }
+ this.cluster = cluster;
+ emittersMetersMap = new HashMap<String, Map<String, Meter>>();
}
- public void sentMessage(int partitionId) {
- emittersMeters[partitionId].mark();
+ public void sentMessage(Destination destination) {
+ //TODO
+ /*
+ Map<String, Meter> map = emittersMetersMap.get(stream);
+ if (map == null) {
+ map = new HashMap<String, Meter>();
+ emittersMetersMap.put(stream, map);
+ }
+ Meter meter = emittersMetersMap.get(stream).get(partitionId);
+ if (meter == null) {
+ meter = Metrics.newMeter(TCPEmitter.class, "event-emitted@"
+ + cluster.getPhysicalCluster().getName() + "@stream-"+ stream + "@partition-"
+ + partitionId, "event-emitted", TimeUnit.SECONDS);
+ emittersMetersMap.get(stream).put(partitionId, meter);
+ }
+ */
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index 6aaa8f1..2e14173 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -28,6 +28,7 @@ import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.comm.topology.StreamConsumer;
@@ -83,8 +84,9 @@ public class DefaultRemoteSenders implements RemoteSenders {
// represented by a single stream consumer
RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
if (sender == null) {
- RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
- .getClusterName())), hasher, consumer.getClusterName());
+ Cluster cluster = remoteClusters.getCluster(consumer
+ .getClusterName());
+ RemoteSender newSender = new RemoteSender(cluster,remoteEmitters.getEmitter(cluster), hasher, consumer.getClusterName());
// TODO cleanup when remote topologies die
sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
if (sender == null) {
@@ -112,7 +114,7 @@ public class DefaultRemoteSenders implements RemoteSenders {
@Override
public void run() {
try {
- sender.send(hashKey, serDeser.serialize(event));
+ sender.send(event.getStreamName(),hashKey, serDeser.serialize(event));
} catch (InterruptedException e) {
logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index dba8b6c..ad9e02d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -21,8 +21,10 @@ package org.apache.s4.core;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.s4.base.Destination;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Hasher;
+import org.apache.s4.comm.topology.Cluster;
/**
* Sends events to a remote cluster.
@@ -34,23 +36,27 @@ public class RemoteSender {
final private Hasher hasher;
AtomicInteger targetPartition = new AtomicInteger();
final private String remoteClusterName;
+ private Cluster cluster;
- public RemoteSender(Emitter emitter, Hasher hasher, String clusterName) {
+ public RemoteSender(Cluster cluster, Emitter emitter, Hasher hasher, String clusterName) {
super();
+ this.cluster = cluster;
this.emitter = emitter;
this.hasher = hasher;
this.remoteClusterName = clusterName;
}
- public void send(String hashKey, ByteBuffer message) throws InterruptedException {
+ public void send(String streamName,String hashKey, ByteBuffer message) throws InterruptedException {
int partition;
if (hashKey == null) {
// round robin by default
- partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount());
+ partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount(streamName));
} else {
- partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+ partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(streamName));
}
- emitter.send(partition, message);
+ //TODO: where do we get the mode
+ Destination destination = cluster.getDestination(streamName, partition, emitter.getType());
+ emitter.send(destination, message);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index b103ad8..5ade15d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -21,12 +21,14 @@ package org.apache.s4.core;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
+import org.apache.s4.base.Destination;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Event;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.core.staging.SenderExecutorServiceFactory;
import org.apache.s4.core.util.S4Metrics;
@@ -59,6 +61,8 @@ public class SenderImpl implements Sender {
@Inject
S4Metrics metrics;
+ private Cluster cluster;
+
/**
*
* @param emitter
@@ -70,11 +74,12 @@ public class SenderImpl implements Sender {
*/
@Inject
public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
- SenderExecutorServiceFactory senderExecutorServiceFactory) {
+ SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
this.emitter = emitter;
this.serDeser = serDeser;
this.hasher = hasher;
this.assignment = assignment;
+ this.cluster = cluster;
this.tpe = senderExecutorServiceFactory.create();
}
@@ -93,7 +98,7 @@ public class SenderImpl implements Sender {
*/
@Override
public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+ int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
if (partition == localPartitionId) {
metrics.sentLocal();
/* Hey we are in the same JVM, don't use the network. */
@@ -132,7 +137,9 @@ public class SenderImpl implements Sender {
public void run() {
ByteBuffer serializedEvent = serDeser.serialize(event);
try {
- emitter.send(remotePartitionId, serializedEvent);
+ //TODO: where can we get the type ?
+ Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId, "tcp");
+ emitter.send(destination, serializedEvent);
} catch (InterruptedException e) {
logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
Thread.currentThread().interrupt();
@@ -154,13 +161,15 @@ public class SenderImpl implements Sender {
@Override
public void run() {
ByteBuffer serializedEvent = serDeser.serialize(event);
-
- for (int i = 0; i < emitter.getPartitionCount(); i++) {
+ Integer partitionCount = cluster.getPartitionCount(event.getStreamName());
+ for (int i = 0; i < partitionCount; i++) {
/* Don't use the comm layer when we send to the same partition. */
if (localPartitionId != i) {
try {
- emitter.send(i, serializedEvent);
+ //TODO: where to get the mode from
+ Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
+ emitter.send(destination, serializedEvent);
metrics.sentEvent(i);
} catch (InterruptedException e) {
logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 0b81bbe..c946385 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -59,7 +59,8 @@ public class S4Metrics {
@Inject
private void init() {
- senderMeters = new Meter[emitter.getPartitionCount()];
+ //TODO: FIX METER
+ senderMeters = new Meter[100];
// int localPartitionId = assignment.assignClusterNode().getPartition();
for (int i = 0; i < senderMeters.length; i++) {
senderMeters[i] = Metrics.newMeter(SenderImpl.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a65684e9/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index fddbd03..e640066 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -9,6 +9,7 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.IdealState;
+import org.apache.s4.base.Destination;
import org.apache.s4.base.Event;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.TCPEmitter;
@@ -46,7 +47,8 @@ public class GenericEventAdapter {
KryoSerDeser serializer = new KryoSerDeser(classLoader);
// EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
System.out.println("Sending event to partition:" + partitionId);
- emitter.send(partitionId, serializer.serialize(event));
+ Destination destination = cluster.getDestination(adapterArgs.streamName, partitionId, emitter.getType());
+ emitter.send(destination, serializer.serialize(event));
Thread.sleep(1000);
}
} catch (Exception e) {