You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC

[3/19] git commit: Changing the Emitters to use stream specific partitioning

Changing the Emitters to use stream specific partitioning


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/34dff892
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/34dff892
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/34dff892

Branch: refs/heads/S4-110
Commit: 34dff8924d242b64303a1aad67b2fef57f8c55d0
Parents: f3e56df
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sun Dec 2 14:45:25 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sun Dec 2 14:45:25 2012 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Emitter.java  |    2 +
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    5 +-
 .../java/org/apache/s4/comm/topology/Cluster.java  |    2 +
 .../apache/s4/comm/topology/ClusterFromHelix.java  |  284 +++++++++------
 .../org/apache/s4/comm/topology/ClusterFromZK.java |    6 +
 .../apache/s4/comm/topology/PhysicalCluster.java   |    3 +
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   10 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |    4 +-
 .../src/main/java/org/apache/s4/core/Sender.java   |    4 +-
 9 files changed, 203 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index c73dd1c..1d0e65e 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -37,5 +37,7 @@ public interface Emitter {
 
     int getPartitionCount();
 
+    int getPartitionCount(String stream);
+
     void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5d59be7..5e35588 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -208,10 +208,13 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 		}
 	}
 
-	@Override
+	//@Override
 	public int getPartitionCount() {
 		return topology.getPhysicalCluster().getPartitionCount();
 	}
+	public int getPartitionCount(String streamName) {
+    return topology.getPhysicalCluster().getPartitionCount();
+  }
 
 	class ExceptionHandler extends SimpleChannelUpstreamHandler {
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index 33afcfe..6d98be3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -33,4 +33,6 @@ public interface Cluster {
     public void removeListener(ClusterChangeListener listener);
 
     InstanceConfig getDestination(String streamName, int partitionId);
+
+    Integer getPartitionCount(String streamName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 67053a9..3226fab 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -19,7 +19,10 @@
 package org.apache.s4.comm.topology;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -29,8 +32,16 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
 
@@ -39,115 +50,168 @@ import org.apache.helix.spectator.RoutingTableProvider;
  * listeners of runtime changes in the configuration.
  * 
  */
-public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
-
-	private static Logger logger = LoggerFactory
-			.getLogger(ClusterFromHelix.class);
-
-	private String clusterName;
-	private final AtomicReference<PhysicalCluster> clusterRef;
-	private final List<ClusterChangeListener> listeners;
-	private final Lock lock;
-
-	/**
-	 * only the local topology
-	 */
-	@Inject
-	public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
-			@Named("s4.cluster.zk_address") String zookeeperAddress,
-			@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-			@Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
-			throws Exception {
-		this.clusterRef = new AtomicReference<PhysicalCluster>();
-		this.listeners = new ArrayList<ClusterChangeListener>();
-		lock = new ReentrantLock();
-
-	}
-
-	/**
-	 * any topology
-	 */
-	public ClusterFromHelix(String clusterName, ZkClient zkClient,
-			String machineId) {
-		this.clusterName = clusterName;
-		this.clusterRef = new AtomicReference<PhysicalCluster>();
-		this.listeners = new ArrayList<ClusterChangeListener>();
-		lock = new ReentrantLock();
-
-	}
-
-	@Override
-	public void onExternalViewChange(List<ExternalView> externalViewList,
-			NotificationContext changeContext) {
-		lock.lock();
-		try {
-		  logger.info("Start:Processing change in cluster topology");
-			super.onExternalViewChange(externalViewList, changeContext);
-			for(ClusterChangeListener listener:listeners){
-			  listener.onChange();
-			}
-	    logger.info("End:Processing change in cluster topology");
-
-		} catch (Exception e) {
-			logger.error("", e);
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	@Override
-	public PhysicalCluster getPhysicalCluster() {
-		return clusterRef.get();
-	}
-
-	@Override
-	public void addListener(ClusterChangeListener listener) {
-		logger.info("Adding topology change listener:" + listener);
-		listeners.add(listener);
-	}
-
-	@Override
-	public void removeListener(ClusterChangeListener listener) {
-		logger.info("Removing topology change listener:" + listener);
-		listeners.remove(listener);
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result
-				+ ((clusterName == null) ? 0 : clusterName.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		ClusterFromHelix other = (ClusterFromHelix) obj;
-		if (clusterName == null) {
-			if (other.clusterName != null)
-				return false;
-		} else if (!clusterName.equals(other.clusterName))
-			return false;
-		return true;
-	}
-
-	@Override
-	public InstanceConfig getDestination(String streamName, int partitionId) {
-		List<InstanceConfig> instances = getInstances(streamName, streamName
-				+ "_" + partitionId, "LEADER");
-		if (instances.size() == 1) {
-			return instances.get(0);
-		} else {
-			return null;
-		}
-
-	}
+public class ClusterFromHelix extends RoutingTableProvider implements Cluster
+{
+
+  private static Logger logger = LoggerFactory
+      .getLogger(ClusterFromHelix.class);
+
+  private final String clusterName;
+  private final AtomicReference<PhysicalCluster> clusterRef;
+  private final List<ClusterChangeListener> listeners;
+  private final Lock lock;
+  private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+
+  /**
+   * only the local topology
+   */
+  @Inject
+  public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
+      @Named("s4.cluster.zk_address") String zookeeperAddress,
+      @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+      @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
+      throws Exception
+  {
+    this.clusterName = clusterName;
+    Map<String, Integer> map = Collections.emptyMap();
+    partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+    this.clusterRef = new AtomicReference<PhysicalCluster>();
+    this.listeners = new ArrayList<ClusterChangeListener>();
+    lock = new ReentrantLock();
+
+  }
+
+  /**
+   * any topology
+   */
+  public ClusterFromHelix(String clusterName, ZkClient zkClient,
+      String machineId)
+  {
+    this.clusterName = clusterName;
+    Map<String, Integer> map = Collections.emptyMap();
+    partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+    this.clusterRef = new AtomicReference<PhysicalCluster>();
+    this.listeners = new ArrayList<ClusterChangeListener>();
+    lock = new ReentrantLock();
+
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext)
+  {
+    lock.lock();
+    try
+    {
+      logger.info("Start:Processing change in cluster topology");
+      super.onExternalViewChange(externalViewList, changeContext);
+      HelixManager manager = changeContext.getManager();
+      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+      ConfigAccessor configAccessor = manager.getConfigAccessor();
+      ConfigScopeBuilder builder = new ConfigScopeBuilder();
+      Builder keyBuilder = helixDataAccessor.keyBuilder();
+      List<String> resources = helixDataAccessor.getChildNames(keyBuilder
+          .idealStates());
+      Map<String,Integer> map = new HashMap<String, Integer>();
+      for (String resource : resources)
+      {
+        String resourceType = configAccessor.get(
+            builder.forCluster(clusterName).forResource(resource)
+                .build(), "type");
+        if("Task".equalsIgnoreCase(resourceType)){
+          String streamName = configAccessor.get(
+              builder.forCluster(clusterName).forResource(resource)
+                  .build(), "streamName");
+          IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+          map.put(streamName, idealstate.getNumPartitions());
+        }
+      }
+      partitionCountMapRef.set(map);
+      for (ClusterChangeListener listener : listeners)
+      {
+        listener.onChange();
+      }
+      logger.info("End:Processing change in cluster topology");
+
+    } catch (Exception e)
+    {
+      logger.error("", e);
+    } finally
+    {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public PhysicalCluster getPhysicalCluster()
+  {
+    return clusterRef.get();
+  }
+
+  @Override
+  public void addListener(ClusterChangeListener listener)
+  {
+    logger.info("Adding topology change listener:" + listener);
+    listeners.add(listener);
+  }
+
+  @Override
+  public void removeListener(ClusterChangeListener listener)
+  {
+    logger.info("Removing topology change listener:" + listener);
+    listeners.remove(listener);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((clusterName == null) ? 0 : clusterName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ClusterFromHelix other = (ClusterFromHelix) obj;
+    if (clusterName == null)
+    {
+      if (other.clusterName != null)
+        return false;
+    } else if (!clusterName.equals(other.clusterName))
+      return false;
+    return true;
+  }
+
+  @Override
+  public InstanceConfig getDestination(String streamName, int partitionId)
+  {
+    List<InstanceConfig> instances = getInstances(streamName, streamName + "_"
+        + partitionId, "LEADER");
+    if (instances.size() == 1)
+    {
+      return instances.get(0);
+    } else
+    {
+      return null;
+    }
+  }
+  
+  @Override
+  public Integer getPartitionCount(String streamName){
+    Integer numPartitions = partitionCountMapRef.get().get(streamName);
+    if(numPartitions==null){
+      return -1;
+    }
+    return numPartitions;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 87dc0f3..f6d4df9 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -221,4 +221,10 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
       return null;
     }
 
+    @Override
+    public Integer getPartitionCount(String streamName)
+    {
+      return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 7c590ac..ce53c1a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -98,6 +98,9 @@ public class PhysicalCluster {
         return numPartitions;
     }
 
+    public int getPartitionCount(String stream) {
+      return numPartitions;
+  }
     /**
      * @param node
      */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 2f156b2..fb18dd6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -103,11 +103,11 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
         return true;
     }
 
-    @Override
+   // @Override
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
-
+    
     @Override
     public void onChange() {
         refreshCluster();
@@ -129,4 +129,10 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
         // TODO Auto-generated method stub
 
     }
+
+    @Override
+    public int getPartitionCount(String stream)
+    {
+      return topology.getPhysicalCluster().getPartitionCount(stream);      
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index daccbaa..6bc52e3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -41,9 +41,9 @@ public class RemoteSender {
     public void send(String hashKey, EventMessage eventMessage) {
         if (hashKey == null) {
             // round robin by default
-            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount()), eventMessage);
+            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())), eventMessage);
         } else {
-            int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+            int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(eventMessage.getStreamName()));
             emitter.send(partition, eventMessage);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/34dff892/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 5b0b03d..961cc77 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -86,7 +86,7 @@ public class Sender {
      * 
      */
     public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
-        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
 
         if (partition == localPartitionId) {
             /* Hey we are in the same JVM, don't use the network. */
@@ -110,7 +110,7 @@ public class Sender {
      */
     public void sendToRemotePartitions(Event event) {
 
-        for (int i = 0; i < emitter.getPartitionCount(); i++) {
+        for (int i = 0; i < emitter.getPartitionCount(event.getStreamName()); i++) {
 
             /* Don't use the comm layer when we send to the same partition. */
             if (localPartitionId != i)