You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/13 14:36:59 UTC

[4/4] git commit: Prevent "this" reference escape in the constructor of several platform classes

Prevent "this" reference escape in the constructor of several platform classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/317d8b1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/317d8b1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/317d8b1e

Branch: refs/heads/piper
Commit: 317d8b1e43cf9d23bb24d45af9ac3f37fe8c8840
Parents: 5f84f1f
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jul 13 11:52:27 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jul 13 11:52:27 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    6 +-
 .../s4/comm/topology/ClusterChangeListener.java    |    2 +-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   12 ++-
 .../java/org/apache/s4/comm/udp/UDPListener.java   |    4 +
 .../src/main/java/org/apache/s4/core/App.java      |  109 ++++++--------
 .../src/main/java/org/apache/s4/core/Stream.java   |   18 ++-
 .../s4/deploy/prodcon/TestProducerConsumer.java    |   39 +++++-
 .../java/org/apache/s4/example/counter/MyApp.java  |    2 +-
 .../org/apache/s4/example/model/Controller.java    |    2 +-
 9 files changed, 114 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5c83b09..69ec5f1 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -120,7 +120,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         this.nettyTimeout = timeout;
         this.bufferCapacity = bufferSize;
         this.topology = topology;
-        this.topology.addListener(this);
 
         // Initialize data structures
         int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
@@ -158,6 +157,11 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
 
+    }
+
+    @Inject
+    private void init() {
+        this.topology.addListener(this);
         refreshCluster();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
index e4452d1..c53798a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
@@ -1,7 +1,7 @@
 package org.apache.s4.comm.topology;
 
 /**
- * Entities interested in changes occuring in topologies implement this listener and should register through the
+ * Entities interested in changes occurring in topologies implement this listener and should register through the
  * {@link Cluster} interface
  * 
  */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index dc700ea..0f29f6b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -36,7 +36,6 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
     @Inject
     public UDPEmitter(Cluster topology) {
         this.topology = topology;
-        topology.addListener(this);
         nodes = HashBiMap.create(topology.getPhysicalCluster().getNodes().size());
         for (ClusterNode node : topology.getPhysicalCluster().getNodes()) {
             nodes.forcePut(node.getPartition(), node);
@@ -49,6 +48,12 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
         }
     }
 
+    @Inject
+    private void init() {
+        topology.addListener(this);
+        refreshCluster();
+    }
+
     @Override
     public boolean send(int partitionId, EventMessage eventMessage) {
         try {
@@ -83,6 +88,10 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
 
     @Override
     public void onChange() {
+        refreshCluster();
+    }
+
+    private void refreshCluster() {
         // topology changes when processes pick tasks
         synchronized (nodes) {
             for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
@@ -90,6 +99,7 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
                 nodes.forcePut(partition, clusterNode);
             }
         }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 5cd62de..316999c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -45,6 +45,10 @@ public class UDPListener implements Listener, Runnable {
         }
         bs = new byte[BUFFER_LENGTH];
         datagram = new DatagramPacket(bs, bs.length);
+    }
+
+    @Inject
+    private void init() {
         (new Thread(this)).start();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index eae1dda..f5de855 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,6 @@ import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,13 +85,6 @@ public abstract class App {
     };
 
     /**
-     * @return true if the application is running in cluster mode.
-     */
-    // public boolean isCluster() {
-    // return isCluster.booleanValue();
-    // }
-
-    /**
      * @return the unique app id
      */
     public int getId() {
@@ -127,12 +119,6 @@ public abstract class App {
         return pePrototypes;
     }
 
-    // void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
-    // logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
-    // stream2pe.put(stream, pePrototype);
-    //
-    // }
-
     /* Returns list of internal streams. Should only be used within the core package. */
     // TODO visibility
     public List<Streamable<Event>> getStreams() {
@@ -276,18 +262,6 @@ public abstract class App {
     }
 
     /**
-     * @param sender
-     *            - sends events to the communication layer.
-     * @param receiver
-     *            - receives events from the communication layer.
-     */
-    public void setCommLayer(Sender sender, Receiver receiver) {
-        // this.sender = sender;
-        // this.receiver = receiver;
-        // sender.setPartition(receiver.getPartition());
-    }
-
-    /**
      * Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
      * prototypes by key.
      * 
@@ -299,64 +273,73 @@ public abstract class App {
      *            the name of the stream
      * @param finder
      *            the key finder object
+     * @param eventType
+     *            expected event type
      * @param processingElements
      *            the target processing elements
      * @return the stream
      */
-    protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
+    protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, Class<T> eventType,
             ProcessingElement... processingElements) {
 
-        return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements);
+        return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements).setEventType(eventType)
+                .register(this);
     }
 
     /**
-     * Creates a broadcast stream that sends the events to all the PE instances in each of the target prototypes.
-     * 
-     * <p>
-     * Keep in mind that if you had a million PE instances, the event would be delivered to all them.
-     * 
-     * @param name
-     *            the name of the stream
-     * @param processingElements
-     *            the target processing elements
-     * @return the stream
+     * @see App#createStream(String, KeyFinder, Class, ProcessingElement...)
+     */
+    protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
+            ProcessingElement... processingElements) {
+        return createStream(name, finder, null, processingElements);
+    }
+
+    /**
+     * @see App#createStream(String, KeyFinder, Class, ProcessingElement...)
      */
     protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
+        return createStream(name, null, processingElements);
+    }
 
-        return new Stream<T>(this).setName(name).setPEs(processingElements);
+    /**
+     * @see App#createStream(String, KeyFinder, Class, ProcessingElement...)
+     */
+    public <T extends Event> Stream<T> createStream(Class<T> type) {
+        return createStream(null, null, type);
     }
 
     /**
-     * Creates stream with default values. Use the builder methods to configure the stream. Example:
-     * <p>
-     * 
-     * <pre>
-     *  s1 = <SampleEvent> createStream().withName("My first stream.").withKey(new AKeyFinder()).to(somePE);
-     * </pre>
-     * 
-     * <p>
+     * Creates a "remote" stream, i.e. a stream that forwards events to remote clusters
      * 
      * @param name
-     *            the name of the stream
-     * @param processingElements
-     *            the target processing elements
-     * @return the stream
+     *            stream name, shared across communicating clusters
+     * @param finder
+     *            key finder
+     * @return a reference to the created remote stream
      */
-    public <T extends Event> Stream<T> createStream(Class<T> type) {
-
-        Stream<T> stream = new Stream<T>(this);
-        stream.setEventType(type);
-        return stream;
+    protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
+        return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
     }
 
+    /**
+     * @see App#createOutputStream(String, KeyFinder)
+     */
     protected <T extends Event> RemoteStream createOutputStream(String name) {
         return createOutputStream(name, null);
     }
 
-    protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
-        return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
-    }
-
+    /**
+     * Creaters an "input" stream, i.e. a stream that listens to events from remote clusters, and that registers its
+     * interest in the stream with the specified name.
+     * 
+     * @param streamName
+     *            name of the remote stream
+     * @param finder
+     *            key finder
+     * @param processingElements
+     *            target processing elements
+     * @return a reference to the created input stream
+     */
     protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
             ProcessingElement... processingElements) {
         remoteStreams.addInputStream(getId(), clusterName, streamName);
@@ -364,9 +347,11 @@ public abstract class App {
 
     }
 
+    /**
+     * @see App#createInputStream(String, KeyFinder, ProcessingElement...)
+     */
     protected <T extends Event> Stream<T> createInputStream(String streamName, ProcessingElement... processingElements) {
-        remoteStreams.addInputStream(getId(), clusterName, streamName);
-        return createStream(streamName, processingElements);
+        return createInputStream(streamName, null, processingElements);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 9b6c030..3089cb6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -47,11 +47,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
      *            we always register streams with the parent application.
      */
     public Stream(App app) {
-        // synchronized (Stream.class) {
-        // id = idCounter++;
-        // }
         this.app = app;
-        app.addStream(this);
         this.sender = app.getSender();
         this.receiver = app.getReceiver();
     }
@@ -93,12 +89,17 @@ public class Stream<T extends Event> implements Runnable, Streamable {
      * @return the stream object
      */
     public Stream<T> setKey(KeyFinder<T> keyFinder) {
-        this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
+        if (keyFinder == null) {
+            this.key = null;
+        } else {
+            this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
+        }
         return this;
     }
 
-    void setEventType(Class<T> type) {
+    Stream<T> setEventType(Class<T> type) {
         this.eventType = type;
+        return this;
     }
 
     /**
@@ -308,4 +309,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
             }
         }
     }
+
+    public Stream<T> register(App app) {
+        app.addStream(this);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 935a3cc..df9291a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -10,6 +10,7 @@ import junit.framework.Assert;
 
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.tools.TaskSetup;
@@ -119,18 +120,42 @@ public class TestProducerConsumer {
 
         initializeS4Node();
 
-        ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
-        zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
+        CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
+        CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
+                CommTestUtils.createZkClient());
+
+        boolean consumerStreamReady = true;
+        try {
+            zkClient.getChildren("/s4/streams/tickStream/consumers");
+        } catch (ZkNoNodeException e) {
+            consumerStreamReady = false;
+        }
+        Assert.assertFalse(consumerStreamReady);
+        final CountDownLatch signalConsumerReady = new CountDownLatch(1);
+
+        zkClient.subscribeChildChanges("/s4/streams/tickStream/consumers", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                if (currentChilds.size() == 1) {
+                    signalConsumerReady.countDown();
+                }
+
+            }
+        });
 
         ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
         record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriConsumer);
         zkClient.create("/s4/clusters/" + CONSUMER_CLUSTER + "/app/s4App", record2, CreateMode.PERSISTENT);
+        // TODO check that consumer app is ready with a better way than checking stream consumers
+        Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
 
-        CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
-                CommTestUtils.createZkClient());
-        Assert.assertTrue(signalConsumptionComplete.await(40, TimeUnit.SECONDS));
+        ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
+        zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
+
+        // that may be a bit long to complete...
+        Assert.assertTrue(signalConsumptionComplete.await(100, TimeUnit.SECONDS));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index eeceaed..d68a41a 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -116,7 +116,7 @@ final public class MyApp extends App {
         MyApp myApp = injector.getInstance(MyApp.class);
         Sender sender = injector.getInstance(Sender.class);
         Receiver receiver = injector.getInstance(Receiver.class);
-        myApp.setCommLayer(sender, receiver);
+        // myApp.setCommLayer(sender, receiver);
         myApp.init();
         myApp.start();
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/317d8b1e/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
index 24f6b1f..4571157 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Controller.java
@@ -86,7 +86,7 @@ public class Controller {
 
             MyApp app = new MyApp(numClasses, numTrainVectors, model, outputInterval, TimeUnit.SECONDS);
 
-            app.setCommLayer(sender, receiver);
+            // app.setCommLayer(sender, receiver);
 
             logger.info("Init app.");
             app.initApp();