You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC
[3/19] git commit: Changing the Emitters to use stream specific
partitioning
Changing the Emitters to use stream specific partitioning
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/34dff892
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/34dff892
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/34dff892
Branch: refs/heads/S4-110
Commit: 34dff8924d242b64303a1aad67b2fef57f8c55d0
Parents: f3e56df
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sun Dec 2 14:45:25 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sun Dec 2 14:45:25 2012 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Emitter.java | 2 +
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 5 +-
.../java/org/apache/s4/comm/topology/Cluster.java | 2 +
.../apache/s4/comm/topology/ClusterFromHelix.java | 284 +++++++++------
.../org/apache/s4/comm/topology/ClusterFromZK.java | 6 +
.../apache/s4/comm/topology/PhysicalCluster.java | 3 +
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 10 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 4 +-
.../src/main/java/org/apache/s4/core/Sender.java | 4 +-
9 files changed, 203 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index c73dd1c..1d0e65e 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -37,5 +37,7 @@ public interface Emitter {
int getPartitionCount();
+ int getPartitionCount(String stream);
+
void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5d59be7..5e35588 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -208,10 +208,13 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
}
- @Override
+ //@Override
public int getPartitionCount() {
return topology.getPhysicalCluster().getPartitionCount();
}
+ public int getPartitionCount(String streamName) {
+ return topology.getPhysicalCluster().getPartitionCount();
+ }
class ExceptionHandler extends SimpleChannelUpstreamHandler {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index 33afcfe..6d98be3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -33,4 +33,6 @@ public interface Cluster {
public void removeListener(ClusterChangeListener listener);
InstanceConfig getDestination(String streamName, int partitionId);
+
+ Integer getPartitionCount(String streamName);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 67053a9..3226fab 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -19,7 +19,10 @@
package org.apache.s4.comm.topology;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -29,8 +32,16 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.name.Named;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
@@ -39,115 +50,168 @@ import org.apache.helix.spectator.RoutingTableProvider;
* listeners of runtime changes in the configuration.
*
*/
-public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
-
- private static Logger logger = LoggerFactory
- .getLogger(ClusterFromHelix.class);
-
- private String clusterName;
- private final AtomicReference<PhysicalCluster> clusterRef;
- private final List<ClusterChangeListener> listeners;
- private final Lock lock;
-
- /**
- * only the local topology
- */
- @Inject
- public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
- throws Exception {
- this.clusterRef = new AtomicReference<PhysicalCluster>();
- this.listeners = new ArrayList<ClusterChangeListener>();
- lock = new ReentrantLock();
-
- }
-
- /**
- * any topology
- */
- public ClusterFromHelix(String clusterName, ZkClient zkClient,
- String machineId) {
- this.clusterName = clusterName;
- this.clusterRef = new AtomicReference<PhysicalCluster>();
- this.listeners = new ArrayList<ClusterChangeListener>();
- lock = new ReentrantLock();
-
- }
-
- @Override
- public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext) {
- lock.lock();
- try {
- logger.info("Start:Processing change in cluster topology");
- super.onExternalViewChange(externalViewList, changeContext);
- for(ClusterChangeListener listener:listeners){
- listener.onChange();
- }
- logger.info("End:Processing change in cluster topology");
-
- } catch (Exception e) {
- logger.error("", e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public PhysicalCluster getPhysicalCluster() {
- return clusterRef.get();
- }
-
- @Override
- public void addListener(ClusterChangeListener listener) {
- logger.info("Adding topology change listener:" + listener);
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(ClusterChangeListener listener) {
- logger.info("Removing topology change listener:" + listener);
- listeners.remove(listener);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + ((clusterName == null) ? 0 : clusterName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ClusterFromHelix other = (ClusterFromHelix) obj;
- if (clusterName == null) {
- if (other.clusterName != null)
- return false;
- } else if (!clusterName.equals(other.clusterName))
- return false;
- return true;
- }
-
- @Override
- public InstanceConfig getDestination(String streamName, int partitionId) {
- List<InstanceConfig> instances = getInstances(streamName, streamName
- + "_" + partitionId, "LEADER");
- if (instances.size() == 1) {
- return instances.get(0);
- } else {
- return null;
- }
-
- }
+public class ClusterFromHelix extends RoutingTableProvider implements Cluster
+{
+
+ private static Logger logger = LoggerFactory
+ .getLogger(ClusterFromHelix.class);
+
+ private final String clusterName;
+ private final AtomicReference<PhysicalCluster> clusterRef;
+ private final List<ClusterChangeListener> listeners;
+ private final Lock lock;
+ private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+
+ /**
+ * only the local topology
+ */
+ @Inject
+ public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
+ throws Exception
+ {
+ this.clusterName = clusterName;
+ Map<String, Integer> map = Collections.emptyMap();
+ partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+ this.clusterRef = new AtomicReference<PhysicalCluster>();
+ this.listeners = new ArrayList<ClusterChangeListener>();
+ lock = new ReentrantLock();
+
+ }
+
+ /**
+ * any topology
+ */
+ public ClusterFromHelix(String clusterName, ZkClient zkClient,
+ String machineId)
+ {
+ this.clusterName = clusterName;
+ Map<String, Integer> map = Collections.emptyMap();
+ partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+ this.clusterRef = new AtomicReference<PhysicalCluster>();
+ this.listeners = new ArrayList<ClusterChangeListener>();
+ lock = new ReentrantLock();
+
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+ lock.lock();
+ try
+ {
+ logger.info("Start:Processing change in cluster topology");
+ super.onExternalViewChange(externalViewList, changeContext);
+ HelixManager manager = changeContext.getManager();
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ List<String> resources = helixDataAccessor.getChildNames(keyBuilder
+ .idealStates());
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ for (String resource : resources)
+ {
+ String resourceType = configAccessor.get(
+ builder.forCluster(clusterName).forResource(resource)
+ .build(), "type");
+ if("Task".equalsIgnoreCase(resourceType)){
+ String streamName = configAccessor.get(
+ builder.forCluster(clusterName).forResource(resource)
+ .build(), "streamName");
+ IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+ map.put(streamName, idealstate.getNumPartitions());
+ }
+ }
+ partitionCountMapRef.set(map);
+ for (ClusterChangeListener listener : listeners)
+ {
+ listener.onChange();
+ }
+ logger.info("End:Processing change in cluster topology");
+
+ } catch (Exception e)
+ {
+ logger.error("", e);
+ } finally
+ {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public PhysicalCluster getPhysicalCluster()
+ {
+ return clusterRef.get();
+ }
+
+ @Override
+ public void addListener(ClusterChangeListener listener)
+ {
+ logger.info("Adding topology change listener:" + listener);
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(ClusterChangeListener listener)
+ {
+ logger.info("Removing topology change listener:" + listener);
+ listeners.remove(listener);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((clusterName == null) ? 0 : clusterName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ClusterFromHelix other = (ClusterFromHelix) obj;
+ if (clusterName == null)
+ {
+ if (other.clusterName != null)
+ return false;
+ } else if (!clusterName.equals(other.clusterName))
+ return false;
+ return true;
+ }
+
+ @Override
+ public InstanceConfig getDestination(String streamName, int partitionId)
+ {
+ List<InstanceConfig> instances = getInstances(streamName, streamName + "_"
+ + partitionId, "LEADER");
+ if (instances.size() == 1)
+ {
+ return instances.get(0);
+ } else
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public Integer getPartitionCount(String streamName){
+ Integer numPartitions = partitionCountMapRef.get().get(streamName);
+ if(numPartitions==null){
+ return -1;
+ }
+ return numPartitions;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 87dc0f3..f6d4df9 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -221,4 +221,10 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
return null;
}
+ @Override
+ public Integer getPartitionCount(String streamName)
+ {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 7c590ac..ce53c1a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -98,6 +98,9 @@ public class PhysicalCluster {
return numPartitions;
}
+ public int getPartitionCount(String stream) {
+ return numPartitions;
+ }
/**
* @param node
*/
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 2f156b2..fb18dd6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -103,11 +103,11 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
return true;
}
- @Override
+ // @Override
public int getPartitionCount() {
return topology.getPhysicalCluster().getPartitionCount();
}
-
+
@Override
public void onChange() {
refreshCluster();
@@ -129,4 +129,10 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
// TODO Auto-generated method stub
}
+
+ @Override
+ public int getPartitionCount(String stream)
+ {
+ return topology.getPhysicalCluster().getPartitionCount(stream);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index daccbaa..6bc52e3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -41,9 +41,9 @@ public class RemoteSender {
public void send(String hashKey, EventMessage eventMessage) {
if (hashKey == null) {
// round robin by default
- emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount()), eventMessage);
+ emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())), eventMessage);
} else {
- int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+ int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(eventMessage.getStreamName()));
emitter.send(partition, eventMessage);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 5b0b03d..961cc77 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -86,7 +86,7 @@ public class Sender {
*
*/
public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+ int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
if (partition == localPartitionId) {
/* Hey we are in the same JVM, don't use the network. */
@@ -110,7 +110,7 @@ public class Sender {
*/
public void sendToRemotePartitions(Event event) {
- for (int i = 0; i < emitter.getPartitionCount(); i++) {
+ for (int i = 0; i < emitter.getPartitionCount(event.getStreamName()); i++) {
/* Don't use the comm layer when we send to the same partition. */
if (localPartitionId != i)