You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/11 16:31:30 UTC
[2/2] git commit: S4-91 Minor cleanups
Updated Branches:
refs/heads/S4-91 16d50da32 -> be95df13b
S4-91 Minor cleanups
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/be95df13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/be95df13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/be95df13
Branch: refs/heads/S4-91
Commit: be95df13ba4ce8459663cd4c6b76f5fa0df8abac
Parents: 6aaa553
Author: Daniel Gómez Ferro <df...@apache.org>
Authored: Mon Mar 11 17:07:26 2013 +0100
Committer: Daniel Gómez Ferro <df...@apache.org>
Committed: Mon Mar 11 17:16:35 2013 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Sender.java | 1 -
.../org/apache/s4/comm/topology/PartitionData.java | 20 +++-
.../src/main/java/org/apache/s4/core/App.java | 9 +-
.../java/org/apache/s4/core/ProcessingElement.java | 27 +++--
.../main/java/org/apache/s4/core/RemoteSender.java | 4 +-
.../main/java/org/apache/s4/core/S4Bootstrap.java | 1 -
.../main/java/org/apache/s4/core/SenderImpl.java | 2 -
.../apache/s4/core/ri/IsolationWordCounterPE.java | 111 +++++++++++++++
.../org/apache/s4/core/ri/RemoteAdapterApp.java | 18 +++
.../org/apache/s4/core/ri/RemoteStreamRITest.java | 103 ++++----------
.../apache/s4/core/ri/RuntimeIsolationTest.java | 1 -
.../org/apache/s4/fixtures/MockCommModule.java | 4 +
.../s4/wordcount/IsolationWordCounterPE.java | 91 ------------
13 files changed, 205 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
index be2c06d..a634cdc 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
@@ -18,7 +18,6 @@ public interface Sender {
* @return true if the event was sent because the destination is <b>not</b> local.
*
*/
- // boolean checkAndSendIfNotLocal(String hashKey, Event event);
boolean checkAndSendIfNotLocal(int partition, Event event);
/**
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
index 691cb34..54428ba 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.s4.comm.topology;
import java.util.ArrayList;
@@ -60,7 +78,7 @@ public class PartitionData {
this.isExclusive = isExclusive;
}
- public int getGlobalePartitionId(int partitionId) {
+ public int getGlobalPartitionId(int partitionId) {
return globalPartitionMap.get(partitionId);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index c724ff6..8bb33d5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -79,15 +79,12 @@ public abstract class App {
@Inject
private Hasher hasher;
+ @Inject
private ZkClient zkClient;
@Inject
private RemoteStreams remoteStreams;
- public void setZkClient(ZkClient zkClient) {
- this.zkClient = zkClient;
- }
-
@Inject
private Cluster topology;
@@ -241,7 +238,7 @@ public abstract class App {
public void schedule() {
schedule(topology);
- writeToZK();
+ writePartitionDataToZK();
}
/**
@@ -250,7 +247,7 @@ public abstract class App {
*
* @param pes
*/
- private void writeToZK() {
+ private void writePartitionDataToZK() {
List<String> streamsOfNEPE = new ArrayList<String>();
ProcessingElement NEPeInstance = null;
for (int i = 0; i < getPePrototypes().size(); i++) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 09ddd0d..f54f516 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -180,19 +180,19 @@ public abstract class ProcessingElement implements Cloneable {
}
- public void addGlobalPartitionId(int partitionId, int nodeId) {
+ void addGlobalPartitionId(int partitionId, int nodeId) {
partitionData.addPartitionMappingInfo(partitionId, nodeId);
}
- public int getGlobalPartitionId(int partitionId) {
- return partitionData.getGlobalePartitionId(partitionId);
+ int getGlobalPartitionId(int partitionId) {
+ return partitionData.getGlobalPartitionId(partitionId);
}
- public void addInputStream(String stream) {
+ void addInputStream(String stream) {
partitionData.addStream(stream);
}
- public List<String> getInputStreams() {
+ List<String> getInputStreams() {
return partitionData.getStreams();
}
@@ -510,22 +510,25 @@ public abstract class ProcessingElement implements Cloneable {
clearDirty();
}
+ /**
+ * @return whether this PE runs exclusively on some partitions
+ */
public boolean isExclusive() {
return partitionData.isExclusive();
}
/**
- * If set a PE to be exclusive, user need give the partition count of this PE
+ * Forces this PE to run on the given number of partitions exclusively, i.e., no other PE will share
+ * the same partition.
*
- * @param isExclusive
- * @param partitionCount
+ * @param partitionCount Number of partitions to run exclusively on
*/
public void setExclusive(int partitionCount) {
this.partitionData.setExclusive(true);
this.partitionData.setPartitionCount(partitionCount);
}
- public void setPartitionCount(int partitionCount) {
+ void setPartitionCount(int partitionCount) {
this.partitionData.setPartitionCount(partitionCount);
for (int i = 0; i < partitionCount; i++) {
@@ -533,6 +536,12 @@ public abstract class ProcessingElement implements Cloneable {
}
}
+ /**
+ * Returns the number of partitions this PE runs on. It can be different than the total number of
+ * partitions for the application if some PE (this one or others) run exclusively on some partitions.
+ *
+ * @return Number of partitions this PE runs on
+ */
public int getPartitionCount() {
return partitionData.getPartitionCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 6cf98e8..90cc087 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -60,16 +60,14 @@ public class RemoteSender {
Set<Integer> partitions = new HashSet<Integer>();
- logger.warn("Remote sending with hash: " + hashKey);
int hashValue = (hashKey == null) ? targetPartition.incrementAndGet() : (int) hasher.hash(hashKey);
for (String prototype : partitionDatas.keySet()) {
PartitionData data = partitionDatas.get(prototype);
- partitions.add(data.getGlobalePartitionId(hashValue % data.getPartitionCount()));
+ partitions.add(data.getGlobalPartitionId(hashValue % data.getPartitionCount()));
}
for (Integer partition : partitions) {
- logger.warn("Remote sending to partition: " + partition);
emitter.send(partition, message);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 64bc031..874ffe1 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -134,7 +134,6 @@ public class S4Bootstrap {
// use correct classLoader for running the app initialization
Thread.currentThread().setContextClassLoader(app.getClass().getClassLoader());
- app.setZkClient(zkClient);
app.init();
app.schedule();
app.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 6345a5d..48ee248 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -93,8 +93,6 @@ public class SenderImpl implements Sender {
*/
@Override
public boolean checkAndSendIfNotLocal(int partition, Event event) {
- // public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- // int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
if (partition == localPartitionId) {
metrics.sentLocal();
/* Hey we are in the same JVM, don't use the network. */
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java
new file mode 100644
index 0000000..2a0bb2d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/IsolationWordCounterPE.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core.ri;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.s4.wordcount.WordCountEvent;
+import org.apache.s4.wordcount.WordSeenEvent;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IsolationWordCounterPE extends ProcessingElement implements Watcher {
+ private static Logger logger = LoggerFactory.getLogger(IsolationWordCounterPE.class);
+
+ transient private ZooKeeper zk;
+ private String zkPath;
+ static int count = 0;
+ int wordCounter;
+ transient Stream<WordCountEvent> wordClassifierStream;
+ public static AtomicInteger prototypeId = new AtomicInteger();
+
+ private IsolationWordCounterPE() {
+
+ }
+
+ public IsolationWordCounterPE(App app) {
+ super(app);
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:2181", 4000, this);
+ synchronized (prototypeId) {
+ zkPath = "/counters/counter_prototype_" + prototypeId.incrementAndGet() + "_"
+ + System.currentTimeMillis();
+ zk.create(zkPath, "0".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void onEvent(WordSeenEvent event) {
+
+ wordCounter++;
+ System.out.println("seen word " + event.getWord());
+
+ // NOTE: it seems the id is the key for now...
+ wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+ // Update the zookeeper
+ synchronized (this) {
+ count++;
+ try {
+ zk.setData(zkPath, String.valueOf(count).getBytes(), -1);
+ logger.info("set " + zkPath + " " + count);
+ } catch (KeeperException e) {
+ logger.error(zkPath + " " + count, e);
+ } catch (InterruptedException e) {
+ logger.error(zkPath + " " + count, e);
+ }
+ }
+ }
+
+ public void setWordClassifierStream(Stream<WordCountEvent> stream) {
+ this.wordClassifierStream = stream;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
index 91f0a85..ea3ff12 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.s4.core.ri;
import org.apache.s4.base.Event;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
index 0757f5e..0a5a4c4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
@@ -1,13 +1,27 @@
-package org.apache.s4.core.ri;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import static org.apache.s4.core.ri.RuntimeIsolationTest.counterNumber;
+package org.apache.s4.core.ri;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.comm.topology.ZkClient;
@@ -16,31 +30,19 @@ import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.wordcount.WordCountModule;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
+/**
+ * Resue {@link RuntimeIsolationTest} but using an external adapter instead of injecting the events directly.
+ *
+ * Test case is {@link RuntimeIsolationTest.testSimple}
+ *
+ */
public class RemoteStreamRITest extends RuntimeIsolationTest {
- private static Logger logger = LoggerFactory.getLogger(RemoteStreamRITest.class);
-
- @Override
- public void injectData() throws InterruptedException, IOException {
- // Use remote stream
-
- }
-
@Override
public void startNodes() throws IOException, InterruptedException {
- final ZooKeeper zk = CommTestUtils.createZkClient();
-
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
taskSetup.setup("cluster2", 1, 1500);
@@ -62,58 +64,15 @@ public class RemoteStreamRITest extends RuntimeIsolationTest {
"localhost:2181"), 10, "cluster2", 1)));
s4nodes = nodes.toArray(new Process[] {});
-
- CountDownLatch signalTextProcessed = new CountDownLatch(1);
- try {
- CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
- // add authorizations for processing
- for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
- zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
-
-// injectData();
-
- Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
- String results = new String(zk.getData("/results", false, null));
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
-
- List<String> counterInstances = zk.getChildren("/counters", false);
-
- int totalCount = 0;
- int activeInstances = 0;
- for (String instance : counterInstances) {
- int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
- if (count != 0) {
- activeInstances++;
- }
- totalCount += count;
- }
- Assert.assertEquals(numberTasks, counterInstances.size());
- Assert.assertEquals(counterNumber, activeInstances);
-
- Assert.assertEquals(13, totalCount);
- } catch (KeeperException e) {
- e.printStackTrace();
- }
-
}
-
+
@Override
- @Test
- public void testSimple() {
- ZooKeeper zk;
- try {
- zk = CommTestUtils.createZkClient();
- zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- startNodes();
-
- } catch (IOException e) {
- logger.error("", e);
- } catch (KeeperException e) {
- logger.error("", e);
- } catch (InterruptedException e) {
- logger.error("", e);
- }
+ public void createEmitter() throws IOException {
+ // No need for an emitter, we use an adapter
+ }
+ @Override
+ public void injectData() throws InterruptedException, IOException {
+ // No nedd for data injection, we use an adapter
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
index 0b9b064..339d0b4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
@@ -31,7 +31,6 @@ import org.apache.s4.core.util.AppConfig;
import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
-import org.apache.s4.wordcount.IsolationWordCounterPE;
import org.apache.s4.wordcount.SentenceKeyFinder;
import org.apache.s4.wordcount.WordClassifierPE;
import org.apache.s4.wordcount.WordCountEvent;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index c4f1772..f10e263 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -32,10 +32,13 @@ import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.PhysicalCluster;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.ZkClientProvider;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
@@ -68,6 +71,7 @@ public class MockCommModule extends AbstractModule {
bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
install(new FactoryModuleBuilder().implement(RemoteEmitter.class, Mockito.mock(RemoteEmitter.class).getClass())
.build(RemoteEmitterFactory.class));
+ bind(ZkClient.class).toInstance(Mockito.mock(ZkClient.class));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/be95df13/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
deleted file mode 100644
index fc8a891..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.s4.wordcount;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IsolationWordCounterPE extends ProcessingElement implements Watcher {
- private static Logger logger = LoggerFactory.getLogger(IsolationWordCounterPE.class);
-
- transient private ZooKeeper zk;
- private String zkPath;
- static int count = 0;
- int wordCounter;
- transient Stream<WordCountEvent> wordClassifierStream;
- public static AtomicInteger prototypeId = new AtomicInteger();
-
- private IsolationWordCounterPE() {
-
- }
-
- public IsolationWordCounterPE(App app) {
- super(app);
- if (zk == null) {
- try {
- zk = new ZooKeeper("localhost:2181", 4000, this);
- synchronized (prototypeId) {
- zkPath = "/counters/counter_prototype_" + prototypeId.incrementAndGet() + "_"
- + System.currentTimeMillis();
- zk.create(zkPath, "0".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void onEvent(WordSeenEvent event) {
-
- wordCounter++;
- System.out.println("seen word " + event.getWord());
-
- // NOTE: it seems the id is the key for now...
- wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
- // Update the zookeeper
- synchronized (this) {
- count++;
- try {
- zk.setData(zkPath, String.valueOf(count).getBytes(), -1);
- logger.info("set " + zkPath + " " + count);
- } catch (KeeperException e) {
- logger.error(zkPath + " " + count, e);
- } catch (InterruptedException e) {
- logger.error(zkPath + " " + count, e);
- }
- }
- }
-
- public void setWordClassifierStream(Stream<WordCountEvent> stream) {
- this.wordClassifierStream = stream;
- }
-
- @Override
- public void process(WatchedEvent event) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onCreate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
-}