You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC

[2/19] git commit: Renaming s4statemodel factory to taskstatemodelfactory

Renaming s4statemodel factory to taskstatemodelfactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/008e5b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/008e5b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/008e5b0e

Branch: refs/heads/S4-110
Commit: 008e5b0e732101d76946345ffb7cf7d52ca561fb
Parents: 34dff89
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Dec 11 18:26:32 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Dec 11 18:26:32 2012 -0800

----------------------------------------------------------------------
 .../org/apache/s4/comm/HelixBasedCommModule.java   |    4 +-
 .../apache/s4/comm/helix/S4StateModelFactory.java  |   13 -
 .../s4/comm/helix/TaskStateModelFactory.java       |   13 +
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |  392 +++++++-------
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |    3 +-
 .../s4/comm/topology/AssignmentFromHelix.java      |    4 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    2 +-
 .../src/main/java/org/apache/s4/core/Server.java   |    4 +-
 8 files changed, 217 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index 1a979a8..c9cfcb7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -13,7 +13,7 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
@@ -74,7 +74,7 @@ public class HelixBasedCommModule extends AbstractModule{
 
         // a node holds a single partition assignment
         // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(S4StateModelFactory.class);
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(TaskStateModelFactory.class);
         bind(Assignment.class).to(AssignmentFromHelix.class);
         bind(Cluster.class).to(ClusterFromHelix.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java
deleted file mode 100644
index c823bd7..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.s4.comm.helix;
-
-import org.apache.helix.participant.statemachine.StateModelFactory;
-
-
-public class S4StateModelFactory extends StateModelFactory<S4StateModel>{
-
-	@Override
-	public S4StateModel createNewStateModel(String partitionName) {
-		return new S4StateModel(partitionName);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
new file mode 100644
index 0000000..eef64ea
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.comm.helix;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+
+public class TaskStateModelFactory extends StateModelFactory<S4StateModel>{
+
+    @Override
+    public S4StateModel createNewStateModel(String partitionName) {
+        return new S4StateModel(partitionName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5e35588..a0e5002 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -65,203 +65,203 @@ import org.apache.helix.model.InstanceConfig;
  */
 
 public class TCPEmitter implements Emitter, ClusterChangeListener {
-	private static final Logger logger = LoggerFactory
-			.getLogger(TCPEmitter.class);
-
-	private final int nettyTimeout;
-
-	private Cluster topology;
-	private final ClientBootstrap bootstrap;
-
-	/*
-	 * All channels
-	 */
-	private final ChannelGroup channels = new DefaultChannelGroup();
-
-	/*
-	 * Channel used to send messages to each Node
-	 */
-	private final BiMap<InstanceConfig, Channel> nodeChannelMap;
-
-	// lock for synchronizing between cluster updates callbacks and other code
-	private final Lock lock;
-
-	@Inject
-	SerializerDeserializer serDeser= new KryoSerDeser();
-
-	@Inject
-	public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
-			throws InterruptedException {
-		this.nettyTimeout = timeout;
-		this.topology = topology;
-		this.lock = new ReentrantLock();
-
-		// Initialize data structures
-		//int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
-		// TODO cluster can grow in size
-		nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
-				.<InstanceConfig, Channel> create());
-
-		// Initialize netty related structures
-		ChannelFactory factory = new NioClientSocketChannelFactory(
-				Executors.newCachedThreadPool(),
-				Executors.newCachedThreadPool());
-		bootstrap = new ClientBootstrap(factory);
-		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-			@Override
-			public ChannelPipeline getPipeline() {
-				ChannelPipeline p = Channels.pipeline();
-				p.addLast("1", new LengthFieldPrepender(4));
-				p.addLast("2", new ExceptionHandler());
-				return p;
-			}
-		});
-
-		bootstrap.setOption("tcpNoDelay", true);
-		bootstrap.setOption("keepAlive", true);
-		bootstrap.setOption("reuseAddress", true);
-		bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
-	}
-
-	@Inject
-	private void init() {
-		this.topology.addListener(this);
-	}
-
-	private boolean connectTo(InstanceConfig config) {
-
-		if (config == null) {
-
-			logger.error("Invalid clusterNode");
-			return false;
-		}
-
-		try {
-			ChannelFuture connectFuture = this.bootstrap
-					.connect(new InetSocketAddress(config.getHostName(),
-							Integer.parseInt(config.getPort())));
-			connectFuture.await();
-			if (connectFuture.isSuccess()) {
-				channels.add(connectFuture.getChannel());
-				nodeChannelMap
-						.forcePut(config, connectFuture.getChannel());
-				return true;
-			}
-		} catch (InterruptedException ie) {
-			logger.error(String.format("Interrupted while connecting to %s:%d",
-					config.getHostName(), config.getPort()));
-			Thread.currentThread().interrupt();
-		}
-		return false;
-	}
-
-	private void sendMessage(String streamName, int partitionId, byte[] message) {
-		ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-		buffer.writeBytes(message);
-		InstanceConfig config = topology
-				.getDestination(streamName, partitionId);
-		if (!nodeChannelMap.containsKey(config)) {
-			if (!connectTo(config)) {
-				// Couldn't connect, discard message
-				return;
-			}
-		}
-
-		Channel c = nodeChannelMap.get(config);
-		if (c == null)
-			return;
-
-		c.write(buffer).addListener(new MessageSendingListener(partitionId));
-	}
-
-	@Override
-	public boolean send(int partitionId, EventMessage message) {
-		sendMessage(message.getStreamName(), partitionId,
-				serDeser.serialize(message));
-		return true;
-	}
-
-	protected void removeChannel(int partition) {
-		Channel c = nodeChannelMap.remove(partition);
-		if (c == null) {
-			return;
-		}
-		c.close().addListener(new ChannelFutureListener() {
-			@Override
-			public void operationComplete(ChannelFuture future)
-					throws Exception {
-				if (future.isSuccess())
-					channels.remove(future.getChannel());
-				else
-					logger.error("Failed to close channel");
-			}
-		});
-	}
-
-	public void close() {
-		try {
-			channels.close().await();
-			bootstrap.releaseExternalResources();
-		} catch (InterruptedException ie) {
-			logger.error("Interrupted while closing");
-			Thread.currentThread().interrupt();
-		}
-	}
-
-	//@Override
-	public int getPartitionCount() {
-		return topology.getPhysicalCluster().getPartitionCount();
-	}
-	public int getPartitionCount(String streamName) {
+    private static final Logger logger = LoggerFactory
+            .getLogger(TCPEmitter.class);
+
+    private final int nettyTimeout;
+
+    private Cluster topology;
+    private final ClientBootstrap bootstrap;
+
+    /*
+     * All channels
+     */
+    private final ChannelGroup channels = new DefaultChannelGroup();
+
+    /*
+     * Channel used to send messages to each Node
+     */
+    private final BiMap<InstanceConfig, Channel> nodeChannelMap;
+
+    // lock for synchronizing between cluster updates callbacks and other code
+    private final Lock lock;
+
+    @Inject
+    SerializerDeserializer serDeser= new KryoSerDeser();
+
+    @Inject
+    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
+            throws InterruptedException {
+        this.nettyTimeout = timeout;
+        this.topology = topology;
+        this.lock = new ReentrantLock();
+
+        // Initialize data structures
+        //int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+        // TODO cluster can grow in size
+        nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
+                .<InstanceConfig, Channel> create());
+
+        // Initialize netty related structures
+        ChannelFactory factory = new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
+        bootstrap = new ClientBootstrap(factory);
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() {
+                ChannelPipeline p = Channels.pipeline();
+                p.addLast("1", new LengthFieldPrepender(4));
+                p.addLast("2", new ExceptionHandler());
+                return p;
+            }
+        });
+
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+        bootstrap.setOption("reuseAddress", true);
+        bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+    }
+
+    @Inject
+    private void init() {
+        this.topology.addListener(this);
+    }
+
+    private boolean connectTo(InstanceConfig config) {
+
+        if (config == null) {
+
+            logger.error("Invalid clusterNode");
+            return false;
+        }
+
+        try {
+            ChannelFuture connectFuture = this.bootstrap
+                    .connect(new InetSocketAddress(config.getHostName(),
+                            Integer.parseInt(config.getPort())));
+            connectFuture.await();
+            if (connectFuture.isSuccess()) {
+                channels.add(connectFuture.getChannel());
+                nodeChannelMap
+                        .forcePut(config, connectFuture.getChannel());
+                return true;
+            }
+        } catch (InterruptedException ie) {
+            logger.error(String.format("Interrupted while connecting to %s:%d",
+                    config.getHostName(), config.getPort()));
+            Thread.currentThread().interrupt();
+        }
+        return false;
+    }
+
+    private void sendMessage(String streamName, int partitionId, byte[] message) {
+        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+        buffer.writeBytes(message);
+        InstanceConfig config = topology
+                .getDestination(streamName, partitionId);
+        if (!nodeChannelMap.containsKey(config)) {
+            if (!connectTo(config)) {
+                // Couldn't connect, discard message
+                return;
+            }
+        }
+
+        Channel c = nodeChannelMap.get(config);
+        if (c == null)
+            return;
+
+        c.write(buffer).addListener(new MessageSendingListener(partitionId));
+    }
+
+    @Override
+    public boolean send(int partitionId, EventMessage message) {
+        sendMessage(message.getStreamName(), partitionId,
+                serDeser.serialize(message));
+        return true;
+    }
+
+    protected void removeChannel(int partition) {
+        Channel c = nodeChannelMap.remove(partition);
+        if (c == null) {
+            return;
+        }
+        c.close().addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+                if (future.isSuccess())
+                    channels.remove(future.getChannel());
+                else
+                    logger.error("Failed to close channel");
+            }
+        });
+    }
+
+    public void close() {
+        try {
+            channels.close().await();
+            bootstrap.releaseExternalResources();
+        } catch (InterruptedException ie) {
+            logger.error("Interrupted while closing");
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    //@Override
+    public int getPartitionCount() {
+        return topology.getPhysicalCluster().getPartitionCount();
+    }
+    public int getPartitionCount(String streamName) {
     return topology.getPhysicalCluster().getPartitionCount();
   }
 
-	class ExceptionHandler extends SimpleChannelUpstreamHandler {
-		@Override
-		public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-				throws Exception {
-			Throwable t = e.getCause();
-			if (t instanceof ClosedChannelException) {
-				nodeChannelMap.inverse().remove(e.getChannel());
-				return;
-			} else if (t instanceof ConnectException) {
-				nodeChannelMap.inverse().remove(e.getChannel());
-				return;
-			} else {
-				logger.error("Unexpected exception", t);
-			}
-		}
-	}
-
-	class MessageSendingListener implements ChannelFutureListener {
-
-		int partitionId = -1;
-
-		public MessageSendingListener(int partitionId) {
-			super();
-			this.partitionId = partitionId;
-		}
-
-		@Override
-		public void operationComplete(ChannelFuture future) throws Exception {
-			if (!future.isSuccess()) {
-				try {
-					// TODO handle possible cluster reconfiguration between send
-					// and failure callback
-					logger.warn(
-							"Failed to send message to node {} (according to current cluster information)",
-							topology.getPhysicalCluster().getNodes()
-									.get(partitionId));
-				} catch (IndexOutOfBoundsException ignored) {
-					// cluster was changed
-				}
-			}
-
-		}
-	}
-
-	@Override
-	public void onChange() {
-		
-	}
+    class ExceptionHandler extends SimpleChannelUpstreamHandler {
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+                throws Exception {
+            Throwable t = e.getCause();
+            if (t instanceof ClosedChannelException) {
+                nodeChannelMap.inverse().remove(e.getChannel());
+                return;
+            } else if (t instanceof ConnectException) {
+                nodeChannelMap.inverse().remove(e.getChannel());
+                return;
+            } else {
+                logger.error("Unexpected exception", t);
+            }
+        }
+    }
+
+    class MessageSendingListener implements ChannelFutureListener {
+
+        int partitionId = -1;
+
+        public MessageSendingListener(int partitionId) {
+            super();
+            this.partitionId = partitionId;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+                try {
+                    // TODO handle possible cluster reconfiguration between send
+                    // and failure callback
+                    logger.warn(
+                            "Failed to send message to node {} (according to current cluster information)",
+                            topology.getPhysicalCluster().getNodes()
+                                    .get(partitionId));
+                } catch (IndexOutOfBoundsException ignored) {
+                    // cluster was changed
+                }
+            }
+
+        }
+    }
+
+    @Override
+    public void onChange() {
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 7259079..d495033 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -64,8 +64,7 @@ public class TCPListener implements Listener {
 
     @Inject
     public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout) {
-System.out.println("TCPListener.TCPListener()");
-    	// wait for an assignment
+        // wait for an assignment
         node = assignment.assignClusterNode();
         nettyTimeout = timeout;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 7b39701..7e0b4ac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -26,7 +26,7 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +60,7 @@ public class AssignmentFromHelix implements Assignment
                              @Named("s4.cluster.zk_address") String zookeeperAddress 
                              ) throws Exception
   {
-    this.taskStateModelFactory = new S4StateModelFactory();
+    this.taskStateModelFactory = new TaskStateModelFactory();
 //    this.appStateModelFactory = appStateModelFactory;
     this.clusterName = clusterName;
     this.zookeeperAddress = zookeeperAddress;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index a284aad..97ec9af 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -30,7 +30,7 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreamsManager;
 import org.apache.s4.comm.topology.RemoteStreamsManagerImpl;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 666b7e7..bc172a5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -31,7 +31,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.s4.base.util.S4RLoader;
 import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromHelix;
 import org.apache.s4.comm.topology.AssignmentFromZK;
@@ -77,7 +77,7 @@ public class Server {
     private final String instanceName;
     
     @Inject
-    private S4StateModelFactory taskStateModelFactory;
+    private TaskStateModelFactory taskStateModelFactory;
     
     @Inject
     private AppStateModelFactory appStateModelFactory;