You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/11 16:31:30 UTC

[2/2] git commit: S4-91 Minor cleanups

Updated Branches:
  refs/heads/S4-91 16d50da32 -> be95df13b


S4-91 Minor cleanups


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/be95df13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/be95df13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/be95df13

Branch: refs/heads/S4-91
Commit: be95df13ba4ce8459663cd4c6b76f5fa0df8abac
Parents: 6aaa553
Author: Daniel Gómez Ferro <df...@apache.org>
Authored: Mon Mar 11 17:07:26 2013 +0100
Committer: Daniel Gómez Ferro <df...@apache.org>
Committed: Mon Mar 11 17:16:35 2013 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Sender.java   |    1 -
 .../org/apache/s4/comm/topology/PartitionData.java |   20 +++-
 .../src/main/java/org/apache/s4/core/App.java      |    9 +-
 .../java/org/apache/s4/core/ProcessingElement.java |   27 +++--
 .../main/java/org/apache/s4/core/RemoteSender.java |    4 +-
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |    1 -
 .../main/java/org/apache/s4/core/SenderImpl.java   |    2 -
 .../apache/s4/core/ri/IsolationWordCounterPE.java  |  111 +++++++++++++++
 .../org/apache/s4/core/ri/RemoteAdapterApp.java    |   18 +++
 .../org/apache/s4/core/ri/RemoteStreamRITest.java  |  103 ++++----------
 .../apache/s4/core/ri/RuntimeIsolationTest.java    |    1 -
 .../org/apache/s4/fixtures/MockCommModule.java     |    4 +
 .../s4/wordcount/IsolationWordCounterPE.java       |   91 ------------
 13 files changed, 205 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
index be2c06d..a634cdc 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
@@ -18,7 +18,6 @@ public interface Sender {
      * @return true if the event was sent because the destination is <b>not</b> local.
      * 
      */
-    // boolean checkAndSendIfNotLocal(String hashKey, Event event);
     boolean checkAndSendIfNotLocal(int partition, Event event);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
index 691cb34..54428ba 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.s4.comm.topology;
 
 import java.util.ArrayList;
@@ -60,7 +78,7 @@ public class PartitionData {
         this.isExclusive = isExclusive;
     }
 
-    public int getGlobalePartitionId(int partitionId) {
+    public int getGlobalPartitionId(int partitionId) {
         return globalPartitionMap.get(partitionId);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index c724ff6..8bb33d5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -79,15 +79,12 @@ public abstract class App {
     @Inject
     private Hasher hasher;
 
+    @Inject
     private ZkClient zkClient;
 
     @Inject
     private RemoteStreams remoteStreams;
 
-    public void setZkClient(ZkClient zkClient) {
-        this.zkClient = zkClient;
-    }
-
     @Inject
     private Cluster topology;
 
@@ -241,7 +238,7 @@ public abstract class App {
 
     public void schedule() {
         schedule(topology);
-        writeToZK();
+        writePartitionDataToZK();
     }
 
     /**
@@ -250,7 +247,7 @@ public abstract class App {
      * 
      * @param pes
      */
-    private void writeToZK() {
+    private void writePartitionDataToZK() {
         List<String> streamsOfNEPE = new ArrayList<String>();
         ProcessingElement NEPeInstance = null;
         for (int i = 0; i < getPePrototypes().size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 09ddd0d..f54f516 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -180,19 +180,19 @@ public abstract class ProcessingElement implements Cloneable {
 
     }
 
-    public void addGlobalPartitionId(int partitionId, int nodeId) {
+    void addGlobalPartitionId(int partitionId, int nodeId) {
         partitionData.addPartitionMappingInfo(partitionId, nodeId);
     }
 
-    public int getGlobalPartitionId(int partitionId) {
-        return partitionData.getGlobalePartitionId(partitionId);
+    int getGlobalPartitionId(int partitionId) {
+        return partitionData.getGlobalPartitionId(partitionId);
     }
 
-    public void addInputStream(String stream) {
+    void addInputStream(String stream) {
         partitionData.addStream(stream);
     }
 
-    public List<String> getInputStreams() {
+    List<String> getInputStreams() {
         return partitionData.getStreams();
     }
 
@@ -510,22 +510,25 @@ public abstract class ProcessingElement implements Cloneable {
         clearDirty();
     }
 
+    /**
+     * @return whether this PE runs exclusively on some partitions
+     */
     public boolean isExclusive() {
         return partitionData.isExclusive();
     }
 
     /**
-     * If set a PE to be exclusive, user need give the partition count of this PE
+     * Forces this PE to run on the given number of partitions exclusively, i.e., no other PE will share
+     * the same partition.
      * 
-     * @param isExclusive
-     * @param partitionCount
+     * @param partitionCount Number of partitions to run exclusively on
      */
     public void setExclusive(int partitionCount) {
         this.partitionData.setExclusive(true);
         this.partitionData.setPartitionCount(partitionCount);
     }
 
-    public void setPartitionCount(int partitionCount) {
+    void setPartitionCount(int partitionCount) {
 
         this.partitionData.setPartitionCount(partitionCount);
         for (int i = 0; i < partitionCount; i++) {
@@ -533,6 +536,12 @@ public abstract class ProcessingElement implements Cloneable {
         }
     }
 
+    /**
+     * Returns the number of partitions this PE runs on. It can be different than the total number of
+     * partitions for the application if some PE (this one or others) run exclusively on some partitions.
+     * 
+     * @return Number of partitions this PE runs on
+     */
     public int getPartitionCount() {
         return partitionData.getPartitionCount();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 6cf98e8..90cc087 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -60,16 +60,14 @@ public class RemoteSender {
         
         Set<Integer> partitions = new HashSet<Integer>();
 
-        logger.warn("Remote sending with hash: " + hashKey);
         int hashValue = (hashKey == null) ? targetPartition.incrementAndGet() : (int) hasher.hash(hashKey);
 
         for (String prototype : partitionDatas.keySet()) {
             PartitionData data = partitionDatas.get(prototype);
-            partitions.add(data.getGlobalePartitionId(hashValue % data.getPartitionCount()));
+            partitions.add(data.getGlobalPartitionId(hashValue % data.getPartitionCount()));
         }
 
         for (Integer partition : partitions) {
-            logger.warn("Remote sending to partition: " + partition);
             emitter.send(partition, message);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 64bc031..874ffe1 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -134,7 +134,6 @@ public class S4Bootstrap {
             // use correct classLoader for running the app initialization
             Thread.currentThread().setContextClassLoader(app.getClass().getClassLoader());
 
-            app.setZkClient(zkClient);
             app.init();
             app.schedule();
             app.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 6345a5d..48ee248 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -93,8 +93,6 @@ public class SenderImpl implements Sender {
      */
     @Override
     public boolean checkAndSendIfNotLocal(int partition, Event event) {
-        // public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
-        // int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
         if (partition == localPartitionId) {
             metrics.sentLocal();
             /* Hey we are in the same JVM, don't use the network. */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java
new file mode 100644
index 0000000..2a0bb2d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core.ri;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.s4.wordcount.WordCountEvent;
+import org.apache.s4.wordcount.WordSeenEvent;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IsolationWordCounterPE extends ProcessingElement implements Watcher {
+    private static Logger logger = LoggerFactory.getLogger(IsolationWordCounterPE.class);
+
+    transient private ZooKeeper zk;
+    private String zkPath;
+    static int count = 0;
+    int wordCounter;
+    transient Stream<WordCountEvent> wordClassifierStream;
+    public static AtomicInteger prototypeId = new AtomicInteger();
+
+    private IsolationWordCounterPE() {
+
+    }
+
+    public IsolationWordCounterPE(App app) {
+        super(app);
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:2181", 4000, this);
+                synchronized (prototypeId) {
+                    zkPath = "/counters/counter_prototype_" + prototypeId.incrementAndGet() + "_"
+                            + System.currentTimeMillis();
+                    zk.create(zkPath, "0".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void onEvent(WordSeenEvent event) {
+
+        wordCounter++;
+        System.out.println("seen word " + event.getWord());
+
+        // NOTE: it seems the id is the key for now...
+        wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+        // Update the zookeeper
+        synchronized (this) {
+            count++;
+            try {
+                zk.setData(zkPath, String.valueOf(count).getBytes(), -1);
+                logger.info("set " + zkPath + " " + count);
+            } catch (KeeperException e) {
+                logger.error(zkPath + " " + count, e);
+            } catch (InterruptedException e) {
+                logger.error(zkPath + " " + count, e);
+            }
+        }
+    }
+
+    public void setWordClassifierStream(Stream<WordCountEvent> stream) {
+        this.wordClassifierStream = stream;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
index 91f0a85..ea3ff12 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.s4.core.ri;
 
 import org.apache.s4.base.Event;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
index 0757f5e..0a5a4c4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
@@ -1,13 +1,27 @@
-package org.apache.s4.core.ri;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import static org.apache.s4.core.ri.RuntimeIsolationTest.counterNumber;
+package org.apache.s4.core.ri;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.comm.topology.ZkClient;
@@ -16,31 +30,19 @@ import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.wordcount.WordCountModule;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+/**
+ * Resue {@link RuntimeIsolationTest} but using an external adapter instead of injecting the events directly.
+ * 
+ * Test case is {@link RuntimeIsolationTest.testSimple}
+ * 
+ */
 public class RemoteStreamRITest extends RuntimeIsolationTest {
 
-    private static Logger logger = LoggerFactory.getLogger(RemoteStreamRITest.class);
-
-    @Override
-    public void injectData() throws InterruptedException, IOException {
-        // Use remote stream
-
-    }
-
     @Override
     public void startNodes() throws IOException, InterruptedException {
-        final ZooKeeper zk = CommTestUtils.createZkClient();
-
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
         taskSetup.setup("cluster2", 1, 1500);
 
@@ -62,58 +64,15 @@ public class RemoteStreamRITest extends RuntimeIsolationTest {
                 "localhost:2181"), 10, "cluster2", 1)));
 
         s4nodes = nodes.toArray(new Process[] {});
-        
-        CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        try {
-            CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
-            // add authorizations for processing
-            for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
-                zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-            }
-
-//            injectData();
-
-            Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
-            String results = new String(zk.getData("/results", false, null));
-            Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
-
-            List<String> counterInstances = zk.getChildren("/counters", false);
-
-            int totalCount = 0;
-            int activeInstances = 0;
-            for (String instance : counterInstances) {
-                int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
-                if (count != 0) {
-                    activeInstances++;
-                }
-                totalCount += count;
-            }
-            Assert.assertEquals(numberTasks, counterInstances.size());
-            Assert.assertEquals(counterNumber, activeInstances);
-
-            Assert.assertEquals(13, totalCount);
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        }
-        
     }
-
+    
     @Override
-    @Test
-    public void testSimple() {
-        ZooKeeper zk;
-        try {
-            zk = CommTestUtils.createZkClient();
-            zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            startNodes();
-
-        } catch (IOException e) {
-            logger.error("", e);
-        } catch (KeeperException e) {
-            logger.error("", e);
-        } catch (InterruptedException e) {
-            logger.error("", e);
-        }
+    public void createEmitter() throws IOException {
+        // No need for an emitter, we use an adapter
+    }
 
+    @Override
+    public void injectData() throws InterruptedException, IOException {
+        // No nedd for data injection, we use an adapter
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
index 0b9b064..339d0b4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
@@ -31,7 +31,6 @@ import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
-import org.apache.s4.wordcount.IsolationWordCounterPE;
 import org.apache.s4.wordcount.SentenceKeyFinder;
 import org.apache.s4.wordcount.WordClassifierPE;
 import org.apache.s4.wordcount.WordCountEvent;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index c4f1772..f10e263 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -32,10 +32,13 @@ import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.PhysicalCluster;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.ZkClientProvider;
 import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
@@ -68,6 +71,7 @@ public class MockCommModule extends AbstractModule {
         bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
         install(new FactoryModuleBuilder().implement(RemoteEmitter.class, Mockito.mock(RemoteEmitter.class).getClass())
                 .build(RemoteEmitterFactory.class));
+        bind(ZkClient.class).toInstance(Mockito.mock(ZkClient.class));
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
deleted file mode 100644
index fc8a891..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.s4.wordcount;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IsolationWordCounterPE extends ProcessingElement implements Watcher {
-    private static Logger logger = LoggerFactory.getLogger(IsolationWordCounterPE.class);
-
-    transient private ZooKeeper zk;
-    private String zkPath;
-    static int count = 0;
-    int wordCounter;
-    transient Stream<WordCountEvent> wordClassifierStream;
-    public static AtomicInteger prototypeId = new AtomicInteger();
-
-    private IsolationWordCounterPE() {
-
-    }
-
-    public IsolationWordCounterPE(App app) {
-        super(app);
-        if (zk == null) {
-            try {
-                zk = new ZooKeeper("localhost:2181", 4000, this);
-                synchronized (prototypeId) {
-                    zkPath = "/counters/counter_prototype_" + prototypeId.incrementAndGet() + "_"
-                            + System.currentTimeMillis();
-                    zk.create(zkPath, "0".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public void onEvent(WordSeenEvent event) {
-
-        wordCounter++;
-        System.out.println("seen word " + event.getWord());
-
-        // NOTE: it seems the id is the key for now...
-        wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
-        // Update the zookeeper
-        synchronized (this) {
-            count++;
-            try {
-                zk.setData(zkPath, String.valueOf(count).getBytes(), -1);
-                logger.info("set " + zkPath + " " + count);
-            } catch (KeeperException e) {
-                logger.error(zkPath + " " + count, e);
-            } catch (InterruptedException e) {
-                logger.error(zkPath + " " + count, e);
-            }
-        }
-    }
-
-    public void setWordClassifierStream(Stream<WordCountEvent> stream) {
-        this.wordClassifierStream = stream;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}