You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/13 14:36:59 UTC
[4/4] git commit: Prevent "this" reference escape in the constructor
of several platform classes
Prevent "this" reference escape in the constructor of several platform classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/317d8b1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/317d8b1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/317d8b1e
Branch: refs/heads/piper
Commit: 317d8b1e43cf9d23bb24d45af9ac3f37fe8c8840
Parents: 5f84f1f
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jul 13 11:52:27 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jul 13 11:52:27 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 6 +-
.../s4/comm/topology/ClusterChangeListener.java | 2 +-
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 12 ++-
.../java/org/apache/s4/comm/udp/UDPListener.java | 4 +
.../src/main/java/org/apache/s4/core/App.java | 109 ++++++--------
.../src/main/java/org/apache/s4/core/Stream.java | 18 ++-
.../s4/deploy/prodcon/TestProducerConsumer.java | 39 +++++-
.../java/org/apache/s4/example/counter/MyApp.java | 2 +-
.../org/apache/s4/example/model/Controller.java | 2 +-
9 files changed, 114 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5c83b09..69ec5f1 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -120,7 +120,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
this.nettyTimeout = timeout;
this.bufferCapacity = bufferSize;
this.topology = topology;
- this.topology.addListener(this);
// Initialize data structures
int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
@@ -158,6 +157,11 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+ }
+
+ @Inject
+ private void init() {
+ this.topology.addListener(this);
refreshCluster();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
index e4452d1..c53798a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
@@ -1,7 +1,7 @@
package org.apache.s4.comm.topology;
/**
- * Entities interested in changes occuring in topologies implement this listener and should register through the
+ * Entities interested in changes occurring in topologies implement this listener and should register through the
* {@link Cluster} interface
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index dc700ea..0f29f6b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -36,7 +36,6 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
@Inject
public UDPEmitter(Cluster topology) {
this.topology = topology;
- topology.addListener(this);
nodes = HashBiMap.create(topology.getPhysicalCluster().getNodes().size());
for (ClusterNode node : topology.getPhysicalCluster().getNodes()) {
nodes.forcePut(node.getPartition(), node);
@@ -49,6 +48,12 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
}
}
+ @Inject
+ private void init() {
+ topology.addListener(this);
+ refreshCluster();
+ }
+
@Override
public boolean send(int partitionId, EventMessage eventMessage) {
try {
@@ -83,6 +88,10 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
@Override
public void onChange() {
+ refreshCluster();
+ }
+
+ private void refreshCluster() {
// topology changes when processes pick tasks
synchronized (nodes) {
for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
@@ -90,6 +99,7 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
nodes.forcePut(partition, clusterNode);
}
}
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 5cd62de..316999c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -45,6 +45,10 @@ public class UDPListener implements Listener, Runnable {
}
bs = new byte[BUFFER_LENGTH];
datagram = new DatagramPacket(bs, bs.length);
+ }
+
+ @Inject
+ private void init() {
(new Thread(this)).start();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index eae1dda..f5de855 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,6 @@ import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,13 +85,6 @@ public abstract class App {
};
/**
- * @return true if the application is running in cluster mode.
- */
- // public boolean isCluster() {
- // return isCluster.booleanValue();
- // }
-
- /**
* @return the unique app id
*/
public int getId() {
@@ -127,12 +119,6 @@ public abstract class App {
return pePrototypes;
}
- // void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
- // logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
- // stream2pe.put(stream, pePrototype);
- //
- // }
-
/* Returns list of internal streams. Should only be used within the core package. */
// TODO visibility
public List<Streamable<Event>> getStreams() {
@@ -276,18 +262,6 @@ public abstract class App {
}
/**
- * @param sender
- * - sends events to the communication layer.
- * @param receiver
- * - receives events from the communication layer.
- */
- public void setCommLayer(Sender sender, Receiver receiver) {
- // this.sender = sender;
- // this.receiver = receiver;
- // sender.setPartition(receiver.getPartition());
- }
-
- /**
* Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
* prototypes by key.
*
@@ -299,64 +273,73 @@ public abstract class App {
* the name of the stream
* @param finder
* the key finder object
+ * @param eventType
+ * expected event type
* @param processingElements
* the target processing elements
* @return the stream
*/
- protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
+ protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, Class<T> eventType,
ProcessingElement... processingElements) {
- return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements);
+ return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements).setEventType(eventType)
+ .register(this);
}
/**
- * Creates a broadcast stream that sends the events to all the PE instances in each of the target prototypes.
- *
- * <p>
- * Keep in mind that if you had a million PE instances, the event would be delivered to all them.
- *
- * @param name
- * the name of the stream
- * @param processingElements
- * the target processing elements
- * @return the stream
+ * @see App#createStream(String, KeyFinder, Class, ProcessingElement...)
+ */
+ protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
+ ProcessingElement... processingElements) {
+ return createStream(name, finder, null, processingElements);
+ }
+
+ /**
+ * @see App#createStream(String, KeyFinder, Class, ProcessingElement...)
*/
protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
+ return createStream(name, null, processingElements);
+ }
- return new Stream<T>(this).setName(name).setPEs(processingElements);
+ /**
+ * @see App#createStream(String, KeyFinder, Class, ProcessingElement...)
+ */
+ public <T extends Event> Stream<T> createStream(Class<T> type) {
+ return createStream(null, null, type);
}
/**
- * Creates stream with default values. Use the builder methods to configure the stream. Example:
- * <p>
- *
- * <pre>
- * s1 = <SampleEvent> createStream().withName("My first stream.").withKey(new AKeyFinder()).to(somePE);
- * </pre>
- *
- * <p>
+ * Creates a "remote" stream, i.e. a stream that forwards events to remote clusters
*
* @param name
- * the name of the stream
- * @param processingElements
- * the target processing elements
- * @return the stream
+ * stream name, shared across communicating clusters
+ * @param finder
+ * key finder
+ * @return a reference to the created remote stream
*/
- public <T extends Event> Stream<T> createStream(Class<T> type) {
-
- Stream<T> stream = new Stream<T>(this);
- stream.setEventType(type);
- return stream;
+ protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
+ return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
}
+ /**
+ * @see App#createOutputStream(String, KeyFinder)
+ */
protected <T extends Event> RemoteStream createOutputStream(String name) {
return createOutputStream(name, null);
}
- protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
- return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
- }
-
+ /**
+ * Creaters an "input" stream, i.e. a stream that listens to events from remote clusters, and that registers its
+ * interest in the stream with the specified name.
+ *
+ * @param streamName
+ * name of the remote stream
+ * @param finder
+ * key finder
+ * @param processingElements
+ * target processing elements
+ * @return a reference to the created input stream
+ */
protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
ProcessingElement... processingElements) {
remoteStreams.addInputStream(getId(), clusterName, streamName);
@@ -364,9 +347,11 @@ public abstract class App {
}
+ /**
+ * @see App#createInputStream(String, KeyFinder, ProcessingElement...)
+ */
protected <T extends Event> Stream<T> createInputStream(String streamName, ProcessingElement... processingElements) {
- remoteStreams.addInputStream(getId(), clusterName, streamName);
- return createStream(streamName, processingElements);
+ return createInputStream(streamName, null, processingElements);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 9b6c030..3089cb6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -47,11 +47,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
* we always register streams with the parent application.
*/
public Stream(App app) {
- // synchronized (Stream.class) {
- // id = idCounter++;
- // }
this.app = app;
- app.addStream(this);
this.sender = app.getSender();
this.receiver = app.getReceiver();
}
@@ -93,12 +89,17 @@ public class Stream<T extends Event> implements Runnable, Streamable {
* @return the stream object
*/
public Stream<T> setKey(KeyFinder<T> keyFinder) {
- this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
+ if (keyFinder == null) {
+ this.key = null;
+ } else {
+ this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
+ }
return this;
}
- void setEventType(Class<T> type) {
+ Stream<T> setEventType(Class<T> type) {
this.eventType = type;
+ return this;
}
/**
@@ -308,4 +309,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
}
}
}
+
+ public Stream<T> register(App app) {
+ app.addStream(this);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 935a3cc..df9291a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -10,6 +10,7 @@ import junit.framework.Assert;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.tools.TaskSetup;
@@ -119,18 +120,42 @@ public class TestProducerConsumer {
initializeS4Node();
- ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
- zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
+ CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
+ CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
+ CommTestUtils.createZkClient());
+
+ boolean consumerStreamReady = true;
+ try {
+ zkClient.getChildren("/s4/streams/tickStream/consumers");
+ } catch (ZkNoNodeException e) {
+ consumerStreamReady = false;
+ }
+ Assert.assertFalse(consumerStreamReady);
+ final CountDownLatch signalConsumerReady = new CountDownLatch(1);
+
+ zkClient.subscribeChildChanges("/s4/streams/tickStream/consumers", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 1) {
+ signalConsumerReady.countDown();
+ }
+
+ }
+ });
ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriConsumer);
zkClient.create("/s4/clusters/" + CONSUMER_CLUSTER + "/app/s4App", record2, CreateMode.PERSISTENT);
+ // TODO check that consumer app is ready with a better way than checking stream consumers
+ Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
- CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
- CommTestUtils.createZkClient());
- Assert.assertTrue(signalConsumptionComplete.await(40, TimeUnit.SECONDS));
+ ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
+ zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
+
+ // that may be a bit long to complete...
+ Assert.assertTrue(signalConsumptionComplete.await(100, TimeUnit.SECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index eeceaed..d68a41a 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -116,7 +116,7 @@ final public class MyApp extends App {
MyApp myApp = injector.getInstance(MyApp.class);
Sender sender = injector.getInstance(Sender.class);
Receiver receiver = injector.getInstance(Receiver.class);
- myApp.setCommLayer(sender, receiver);
+ // myApp.setCommLayer(sender, receiver);
myApp.init();
myApp.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
index 24f6b1f..4571157 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
@@ -86,7 +86,7 @@ public class Controller {
MyApp app = new MyApp(numClasses, numTrainVectors, model, outputInterval, TimeUnit.SECONDS);
- app.setCommLayer(sender, receiver);
+ // app.setCommLayer(sender, receiver);
logger.info("Init app.");
app.initApp();