You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/07 21:26:08 UTC
git commit: Fixed some issues introduced when moving cluster
knowledge away from emitters. - also fixed tests so they compile correctly -
key issues are: - housekeeping for connections in emitters. In particular,
how to detect cluster changes and knowin
Updated Branches:
refs/heads/S4-110-new a65684e9b -> 423b7e873
Fixed some issues introduced when moving cluster knowledge away from emitters.
- also fixed tests so they compile correctly
- key issues are:
- housekeeping for connections in emitters.
In particular, how to detect cluster changes and knowing that a given destination may
have to be reconnected to (if it failed, restarted, and reused same connection info)
- locality resolution in sender
- inter app communication
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/423b7e87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/423b7e87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/423b7e87
Branch: refs/heads/S4-110-new
Commit: 423b7e873ce8bf436af46cfd2ee45c70ce14e02a
Parents: a65684e
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Feb 7 21:07:44 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Feb 7 21:07:44 2013 +0100
----------------------------------------------------------------------
.../java/org/apache/s4/comm/DefaultCommModule.java | 3 +-
.../org/apache/s4/comm/tcp/TCPDestination.java | 9 +-
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 74 +++++++--------
.../org/apache/s4/comm/topology/ClusterFromZK.java | 19 ++++-
.../org/apache/s4/comm/topology/ClusterNode.java | 49 +++++++++-
.../apache/s4/comm/topology/PhysicalCluster.java | 18 ++--
.../org/apache/s4/comm/udp/UDPDestination.java | 9 +-
.../java/org/apache/s4/comm/tcp/TCPBasicTest.java | 4 +-
.../java/org/apache/s4/comm/udp/UDPBasicTest.java | 4 +-
.../org/apache/s4/fixtures/TestCommModule.java | 6 +
.../main/java/org/apache/s4/core/AppModule.java | 7 +-
.../main/java/org/apache/s4/core/BaseModule.java | 28 +++---
.../main/java/org/apache/s4/core/SenderImpl.java | 6 +-
.../org/apache/s4/core/ft/FTWordCountTest.java | 3 +-
.../java/org/apache/s4/core/ft/RecoveryTest.java | 13 ++-
.../core/moduleloader/ModuleLoaderTestUtils.java | 9 +-
.../apache/s4/deploy/TestAutomaticDeployment.java | 3 +-
.../org/apache/s4/wordcount/WordCountTest.java | 10 +-
.../src/main/java/s4app/ProducerApp.java | 1 -
19 files changed, 174 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index a1d6319..8936f96 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -1,5 +1,5 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
+ok * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -104,7 +104,6 @@ public class DefaultCommModule extends AbstractModule {
.getString("s4.comm.emitter.remote.class"));
install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
RemoteEmitterFactory.class));
- bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
} catch (ClassNotFoundException e) {
logger.error("Cannot find class implementation ", e);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
index 0ddffb8..7523422 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
@@ -3,10 +3,13 @@ package org.apache.s4.comm.tcp;
import org.apache.s4.base.Destination;
import org.apache.s4.comm.topology.ClusterNode;
-public class TCPDestination extends ClusterNode implements Destination {
+public class TCPDestination extends ClusterNode implements Destination {
- public TCPDestination(int partition, int port, String machineName,
- String taskId) {
+ public TCPDestination(ClusterNode clusterNode) {
+ this(clusterNode.getPartition(), clusterNode.getPort(), clusterNode.getMachineName(), clusterNode.getTaskId());
+ }
+
+ public TCPDestination(int partition, int port, String machineName, String taskId) {
super(partition, port, machineName, taskId);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 266d15f..8133316 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -34,7 +34,6 @@ import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterChangeListener;
-import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.util.EmitterMetrics;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -87,7 +86,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
/*
* Channel used to send messages to each destination
*/
- private final BiMap<Destination, Channel> partitionChannelMap;
+ private final BiMap<Destination, Channel> destinationChannelMap;
// lock for synchronizing between cluster updates callbacks and other code
private final Lock lock;
@@ -95,7 +94,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
@Inject
SerializerDeserializerFactory serDeserFactory;
SerializerDeserializer serDeser;
- Map<Integer, Semaphore> writePermits = Maps.newHashMap();
+ Map<Destination, Semaphore> writePermits = Maps.newHashMap();
EmitterMetrics metrics;
@@ -122,7 +121,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
// Initialize data structures
int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
- partitionChannelMap = HashBiMap.create(clusterSize);
+ destinationChannelMap = HashBiMap.create(clusterSize);
// Initialize netty related structures
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -147,7 +146,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
@Inject
private void init() {
- refreshCluster();
this.topology.addListener(this);
serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
metrics = new EmitterMetrics(topology);
@@ -166,7 +164,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
connectFuture.await();
if (connectFuture.isSuccess()) {
channels.add(connectFuture.getChannel());
- partitionChannelMap.forcePut(destination, connectFuture.getChannel());
+ destinationChannelMap.forcePut(destination, connectFuture.getChannel());
+ writePermits.put(destination, new Semaphore(maxPendingWrites));
return true;
}
} catch (InterruptedException ie) {
@@ -176,12 +175,12 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
return false;
}
-
+
@Override
public boolean send(Destination destination, ByteBuffer message) throws InterruptedException {
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
- if (!partitionChannelMap.containsKey(destination)) {
+ if (!destinationChannelMap.containsKey(destination)) {
if (!connectTo((TCPDestination) destination)) {
logger.warn("Could not connect to partition {}, discarding message", destination);
// Couldn't connect, discard message
@@ -191,7 +190,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
writePermits.get(destination).acquire();
- Channel c = partitionChannelMap.get(destination);
+ Channel c = destinationChannelMap.get(destination);
if (c == null) {
logger.warn("Could not find channel for destination {}", destination);
return false;
@@ -201,10 +200,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
return true;
}
-
-
- protected void removeChannel(int partition) {
- Channel c = partitionChannelMap.remove(partition);
+ protected void removeChannel(Destination destination) {
+ Channel c = destinationChannelMap.remove(destination);
if (c == null) {
return;
}
@@ -239,34 +236,35 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
private void refreshCluster() {
lock.lock();
try {
- /*
- for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
- Integer partition = clusterNode.getPartition();
- if (partition == null) {
- logger.error("Illegal partition for clusterNode - " + clusterNode);
- return;
- }
-
- ClusterNode oldNode = partitionNodeMap.remove(partition);
- if (oldNode != null && !oldNode.equals(clusterNode)) {
- removeChannel(partition);
- }
- partitionNodeMap.forcePut(partition, clusterNode);
- if (!writePermits.containsKey(partition)) {
- writePermits.put(partition, new Semaphore(maxPendingWrites));
- }
-
- }*/
+ // // dropped destinations are those in local map but not in updated cluster config
+ // SetView<Destination> droppedDestinations = Sets.difference(destinationChannelMap.keySet(), Sets
+ // .newHashSet(Collections2.transform(topology.getPhysicalCluster().getNodes(),
+ // new Function<ClusterNode, Destination>() {
+ //
+ // @Override
+ // public Destination apply(ClusterNode clusterNode) {
+ // return new TCPDestination(clusterNode);
+ // }
+ // })));
+ // for (Destination dropped : droppedDestinations) {
+ // destinationChannelMap.remove(dropped);
+ // writePermits.remove(dropped);
+ // removeChannel(dropped);
+ // }
+ //
+ // for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
+ // Destination destination = new TCPDestination(clusterNode);
+ // if (!destinationChannelMap.containsKey(destination)) {
+ // destinationChannelMap.put(new TCPDestination(clusterNode), null);
+ // writePermits.put(destination, new Semaphore(maxPendingWrites));
+ // }
+ //
+ // }
} finally {
lock.unlock();
}
}
-// @Override
-// public int getPartitionCount() {
-// return topology.getPhysicalCluster().getPartitionCount();
-// }
-
@Override
public int getPartitionCount(String streamName) {
return topology.getPhysicalCluster().getPartitionCount(streamName);
@@ -277,10 +275,10 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
Throwable t = e.getCause();
if (t instanceof ClosedChannelException) {
- partitionChannelMap.inverse().remove(e.getChannel());
+ destinationChannelMap.inverse().remove(e.getChannel());
return;
} else if (t instanceof ConnectException) {
- partitionChannelMap.inverse().remove(e.getChannel());
+ destinationChannelMap.inverse().remove(e.getChannel());
return;
} else {
logger.error("Unexpected exception", t);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 27bd354..5144831 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,8 +30,9 @@ import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.apache.helix.model.InstanceConfig;
import org.apache.s4.base.Destination;
+import org.apache.s4.comm.tcp.TCPDestination;
+import org.apache.s4.comm.udp.UDPDestination;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,14 +211,28 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
doProcess();
}
+
@Override
public Destination getDestination(String streamName, int partitionId, String type) {
+ List<ClusterNode> nodes = clusterRef.get().getNodes();
+ for (ClusterNode node : nodes) {
+ if (node.getPartition() == partitionId) {
+ if ("tcp".equalsIgnoreCase(type)) {
+ return new TCPDestination(node);
+ } else if ("udp".equalsIgnoreCase(type)) {
+ return new UDPDestination(node);
+ } else {
+ logger.error("Unsupported destination type {}", type);
+ break;
+ }
+ }
+ }
return null;
}
@Override
public Integer getPartitionCount(String streamName) {
- return null;
+ return clusterRef.get().getPartitionCount(streamName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
index e622107..fd5af72 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
@@ -20,13 +20,13 @@ package org.apache.s4.comm.topology;
/**
* Represents an node.
- *
+ *
*/
public class ClusterNode {
- private int partition;
- private int port;
- private String machineName;
- private String taskId;
+ private final int partition;
+ private final int port;
+ private final String machineName;
+ private final String taskId;
public ClusterNode(int partition, int port, String machineName, String taskId) {
this.partition = partition;
@@ -51,10 +51,49 @@ public class ClusterNode {
return taskId;
}
+ @Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("{").append("partition=").append(partition).append(",port=").append(port).append(",machineName=")
.append(machineName).append(",taskId=").append(taskId).append("}");
return sb.toString();
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
+ result = prime * result + partition;
+ result = prime * result + port;
+ result = prime * result + ((taskId == null) ? 0 : taskId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ClusterNode other = (ClusterNode) obj;
+ if (machineName == null) {
+ if (other.machineName != null)
+ return false;
+ } else if (!machineName.equals(other.machineName))
+ return false;
+ if (partition != other.partition)
+ return false;
+ if (port != other.port)
+ return false;
+ if (taskId == null) {
+ if (other.taskId != null)
+ return false;
+ } else if (!taskId.equals(other.taskId))
+ return false;
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 1d13cf4..c391e28 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -90,19 +90,20 @@ public class PhysicalCluster {
}
}
-// /**
-// *
-// * @return Number of partitions in the cluster.
-// */
-// public int getPartitionCount() {
-// return numPartitions;
-// }
-
+ // /**
+ // *
+ // * @return Number of partitions in the cluster.
+ // */
+ // public int getPartitionCount() {
+ // return numPartitions;
+ // }
+
/**
*
* @return Number of partitions in the cluster per stream
*/
public int getPartitionCount(String streamName) {
+ // TODO regardless of stream name ?
return numPartitions;
}
@@ -140,6 +141,7 @@ public class PhysicalCluster {
this.name = name;
}
+ @Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("{ nbNodes=").append(nodes.size()).append(",name=").append(name).append(",mode=").append(mode)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
index b424f0c..c6fe097 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
@@ -3,10 +3,13 @@ package org.apache.s4.comm.udp;
import org.apache.s4.base.Destination;
import org.apache.s4.comm.topology.ClusterNode;
-public class UDPDestination extends ClusterNode implements Destination{
+public class UDPDestination extends ClusterNode implements Destination {
- public UDPDestination(int partition, int port, String machineName,
- String taskId) {
+ public UDPDestination(ClusterNode clusterNode) {
+ this(clusterNode.getPartition(), clusterNode.getPort(), clusterNode.getMachineName(), clusterNode.getTaskId());
+ }
+
+ public UDPDestination(int partition, int port, String machineName, String taskId) {
super(partition, port, machineName, taskId);
// TODO Auto-generated constructor stub
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
index e400813..600ff79 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@ -7,6 +7,7 @@ import junit.framework.Assert;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.MockReceiverModule;
import org.apache.s4.fixtures.NoOpReceiverModule;
@@ -45,7 +46,8 @@ public class TCPBasicTest extends ZkBasedTest {
injector2.getInstance(Listener.class);
// send to the other node
- emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+ emitter.send(new TCPDestination(injector2.getInstance(Assignment.class).assignClusterNode()), injector1
+ .getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
// check receiver got the message
Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
index b06734b..6136a04 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@ -7,6 +7,7 @@ import junit.framework.Assert;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.MockReceiverModule;
import org.apache.s4.fixtures.NoOpReceiverModule;
@@ -44,7 +45,8 @@ public class UDPBasicTest extends ZkBasedTest {
injector2.getInstance(Listener.class);
// send to the other partition (1)
- emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+ emitter.send(new UDPDestination(injector2.getInstance(Assignment.class).assignClusterNode()), injector1
+ .getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index 827ec42..c4e6bb6 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -5,11 +5,14 @@ import java.io.InputStream;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromZK;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
+import com.google.inject.Scopes;
import com.google.inject.name.Names;
/**
@@ -39,5 +42,8 @@ public class TestCommModule extends DefaultCommModule {
ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
zkClient.setZkSerializer(new ZNRecordSerializer());
bind(ZkClient.class).toInstance(zkClient);
+
+ bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
index 20556f8..28da720 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -9,6 +9,7 @@ import org.apache.s4.comm.tcp.TCPListener;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
+import com.google.inject.Scopes;
public class AppModule extends AbstractModule {
@@ -27,11 +28,11 @@ public class AppModule extends AbstractModule {
protected void configure() {
// bind(S4Metrics.class);
- bind(Receiver.class).to(ReceiverImpl.class);
- bind(Sender.class).to(SenderImpl.class);
+ bind(Receiver.class).to(ReceiverImpl.class).in(Scopes.SINGLETON);
+ bind(Sender.class).to(SenderImpl.class).in(Scopes.SINGLETON);
// TODO allow pluggable transport implementation
- bind(Listener.class).to(TCPListener.class);
+ bind(Listener.class).to(TCPListener.class).in(Scopes.SINGLETON);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 171c564..ccb478f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -9,8 +9,10 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromHelix;
+import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterFromHelix;
+import org.apache.s4.comm.topology.ClusterFromZK;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
@@ -45,32 +47,30 @@ public class BaseModule extends AbstractModule {
if (config == null) {
loadProperties(binder());
}
- if (useHelix) {
- bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+ bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+ bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+ if (config.containsKey("s4.helix") && config.getBoolean("s4.helix")) {
bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
bind(Cluster.class).to(ClusterFromHelix.class);
bind(TaskStateModelFactory.class);
bind(AppStateModelFactory.class).in(Scopes.SINGLETON);
// bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
- bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(Bootstrap.class).to(S4HelixBootstrap.class);
// share the Zookeeper connection
return;
- }
- // a node holds a single partition assignment
- // ==> Assignment is a singleton so it shared between base, comm and app
- // layers.
- // it is eager so that the node is able to join a cluster immediately
- // bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
- // bind(Cluster.class).to(ClusterFromZK.class);
+ } else {
+ // a node holds a single partition assignment
+ // ==> Assignment is a singleton so it shared between base, comm and app
+ // layers.
+ // it is eager so that the node is able to join a cluster immediately
+ bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+ bind(Cluster.class).to(ClusterFromZK.class);
- bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
- bind(Bootstrap.class).to(S4Bootstrap.class);
+ bind(Bootstrap.class).to(S4Bootstrap.class);
- // share the Zookeeper connection
- bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 5ade15d..5e4da13 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -61,7 +61,7 @@ public class SenderImpl implements Sender {
@Inject
S4Metrics metrics;
- private Cluster cluster;
+ private final Cluster cluster;
/**
*
@@ -137,7 +137,7 @@ public class SenderImpl implements Sender {
public void run() {
ByteBuffer serializedEvent = serDeser.serialize(event);
try {
- //TODO: where can we get the type ?
+ // TODO: where can we get the type ?
Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId, "tcp");
emitter.send(destination, serializedEvent);
} catch (InterruptedException e) {
@@ -167,7 +167,7 @@ public class SenderImpl implements Sender {
/* Don't use the comm layer when we send to the same partition. */
if (localPartitionId != i) {
try {
- //TODO: where to get the mode from
+ // TODO: where to get the mode from
Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
emitter.send(destination, serializedEvent);
metrics.sentEvent(i);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 50846b4..a1ec575 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -26,6 +26,7 @@ import junit.framework.Assert;
import org.apache.s4.base.Event;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.core.util.AppConfig;
import org.apache.s4.deploy.DeploymentUtils;
@@ -125,7 +126,7 @@ public class FTWordCountTest extends ZkBasedTest {
event.setStreamId("inputStream");
event.put("sentence", String.class, sentence);
emitter.send(
- 0,
+ new TCPDestination(0, 1300, "localhost", "Task-0"),
injector.getInstance(SerializerDeserializerFactory.class)
.createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 4dc9484..1236e14 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -28,6 +28,7 @@ import junit.framework.Assert;
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.s4.base.Event;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.core.util.AppConfig;
@@ -90,9 +91,9 @@ public class RecoveryTest extends ZkBasedTest {
event.setStreamId("inputStream");
event.put("command", String.class, "checkpoint");
emitter.send(
- 0,
- injector.getInstance(SerializerDeserializerFactory.class)
- .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
+
+ new TCPDestination(0, 1300, "localhost", "Task-0"), injector.getInstance(SerializerDeserializerFactory.class)
+ .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
}
private void testCheckpointingConfiguration(Class<?> appClass, Class<?> backendModuleClass,
@@ -136,7 +137,7 @@ public class RecoveryTest extends ZkBasedTest {
event.put("value", String.class, "message1");
event.setStreamId("inputStream");
emitter.send(
- 0,
+ new TCPDestination(0, 1300, "localhost", "Task-0"),
injector.getInstance(SerializerDeserializerFactory.class)
.createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
@@ -172,11 +173,11 @@ public class RecoveryTest extends ZkBasedTest {
event.put("command", String.class, "setValue2");
event.put("value", String.class, "message2");
emitter.send(
- 0,
+ new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
injector.getInstance(SerializerDeserializerFactory.class)
.createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
- Assert.assertTrue(signalValue2Set.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalValue2Set.await(20, TimeUnit.SECONDS));
Assert.assertEquals(expectedFinalResult, new String(zk.getData("/data", false, null)));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index 1ca862c..aedac39 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -12,6 +12,7 @@ import org.apache.s4.base.Event;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.core.BaseModule;
@@ -68,8 +69,8 @@ public class ModuleLoaderTestUtils {
}
Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
- .openStream(), "cluster1",null), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream()));
+ .openStream(), "cluster1", null),
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
Emitter emitter = injector.getInstance(TCPEmitter.class);
List<Long> messages = Lists.newArrayList();
@@ -93,11 +94,11 @@ public class ModuleLoaderTestUtils {
Event event = new Event();
event.put("message", long.class, message);
event.setStreamId("inputStream");
- emitter.send(0, serDeser.serialize(event));
+ emitter.send(new TCPDestination(0, 1300, "localhost", "Task-0"), serDeser.serialize(event));
}
// check sequential nodes in zk with correct data
- Assert.assertTrue(signalMessagesReceived.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalMessagesReceived.await(20, TimeUnit.SECONDS));
List<String> children = zkClient.getChildren("/test");
for (String child : children) {
Long data = zkClient.readData("/test/" + child);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 8b5329b..f8e0096 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.s4.base.Event;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.core.util.AppConfig;
@@ -127,7 +128,7 @@ public class TestAutomaticDeployment extends ZkBasedTest {
event.setStreamId("inputStream");
event.put("line", String.class, time1);
emitter.send(
- 0,
+ new TCPDestination(0, 1300, "localhost", "Task-0"),
injector.getInstance(SerializerDeserializerFactory.class)
.createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index c25d756..8d2b58c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -27,6 +27,7 @@ import junit.framework.Assert;
import org.apache.s4.base.Event;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.core.BaseModule;
import org.apache.s4.core.DefaultCoreModule;
@@ -59,9 +60,9 @@ public class WordCountTest extends ZkBasedTest {
public void createEmitter() throws IOException {
injector = Guice.createInjector(new BaseModule(
- Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null), new DefaultCommModule(
- Resources.getResource("default.s4.comm.properties").openStream()), new DefaultCoreModule(Resources
- .getResource("default.s4.core.properties").openStream()));
+ Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
emitter = injector.getInstance(TCPEmitter.class);
@@ -113,9 +114,8 @@ public class WordCountTest extends ZkBasedTest {
// NOTE: we send to partition 0 since partition 1 hosts the emitter
emitter.send(
- 0,
+ new TCPDestination(0, 1300, "localhost", "Task-0"),
injector.getInstance(SerializerDeserializerFactory.class)
.createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
----------------------------------------------------------------------
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
index ac7f251..b276342 100644
--- a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
@@ -34,7 +34,6 @@ public class ProducerApp extends App {
@Override
protected void onInit() {
System.out.println("Initing CounterApp...");
-
producerPE = createPE(ProducerPE.class, "producer");
producerPE.setStreams(createOutputStream("tickStream"));