You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/13 17:31:04 UTC

git commit: Removed timer in producer for producer/consumer test : messages now sent at maximum rate - highlights issues in tcp emitter

Updated Branches:
  refs/heads/S4-75 [created] 384e2c3d6


Removed timer in producer for producer/consumer test : messages now sent at maximum rate
- highlights issues in tcp emitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/384e2c3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/384e2c3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/384e2c3d

Branch: refs/heads/S4-75
Commit: 384e2c3d602eb898ddf3a9e9626650c81977ace8
Parents: 4a30483
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jul 13 19:21:36 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jul 13 19:28:42 2012 +0200

----------------------------------------------------------------------
 .../s4/deploy/prodcon/TestProducerConsumer.java    |    4 ++--
 .../src/main/java/s4app/ConsumerPE.java            |    8 ++++----
 .../src/main/java/s4app/ProducerApp.java           |    5 +----
 .../src/main/java/s4app/ProducerPE.java            |    7 +++----
 4 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index df9291a..2e05b7a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -121,7 +121,7 @@ public class TestProducerConsumer {
         initializeS4Node();
 
         CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
+        CommTestUtils.watchAndSignalCreation("/AllTicksReceived", signalConsumptionComplete,
                 CommTestUtils.createZkClient());
 
         boolean consumerStreamReady = true;
@@ -155,7 +155,7 @@ public class TestProducerConsumer {
         zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
 
         // that may be a bit long to complete...
-        Assert.assertTrue(signalConsumptionComplete.await(100, TimeUnit.SECONDS));
+        Assert.assertTrue(signalConsumptionComplete.await(30, TimeUnit.SECONDS));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
index 1cc7ed0..bba8559 100644
--- a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
@@ -19,14 +19,14 @@ public class ConsumerPE extends ProcessingElement {
 
     public void onEvent(Event event) {
         eventCount++;
-        logger.trace(
+        logger.info(
                 "Received event with tick {} and time {} for event # {}",
                 new String[] { String.valueOf(event.get("tick", Long.class)), String.valueOf(event.getTime()),
                         String.valueOf(eventCount) });
-        if (eventCount == 1000) {
-            logger.info("Just reached 1000 events");
+        if (eventCount == 100000) {
+            logger.info("Just reached 100000 events");
             ZkClient zkClient = new ZkClient("localhost:2181");
-            zkClient.create("/1000TicksReceived", new byte[0], CreateMode.PERSISTENT);
+            zkClient.create("/AllTicksReceived", new byte[0], CreateMode.PERSISTENT);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
----------------------------------------------------------------------
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
index 4b54112..66ed4dc 100644
--- a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
@@ -1,7 +1,5 @@
 package s4app;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.s4.core.App;
 
 public class ProducerApp extends App {
@@ -11,7 +9,7 @@ public class ProducerApp extends App {
     @Override
     protected void onStart() {
         System.out.println("Starting CounterApp...");
-        producerPE.getInstanceForKey("single");
+        ((ProducerPE) producerPE.getInstanceForKey("single")).sendMessages();
     }
 
     // generic array due to varargs generates a warning.
@@ -21,7 +19,6 @@ public class ProducerApp extends App {
 
         producerPE = createPE(ProducerPE.class, "producer");
         producerPE.setStreams(createOutputStream("tickStream"));
-        producerPE.setTimerInterval(10, TimeUnit.MILLISECONDS);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerPE.java b/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
index 57cc7f3..c8a7597 100644
--- a/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
@@ -12,7 +12,6 @@ public class ProducerPE extends ProcessingElement {
     private static final Logger logger = LoggerFactory.getLogger(ProducerPE.class);
 
     private Streamable[] targetStreams;
-    private long tick = 0;
 
     public ProducerPE(App app) {
         super(app);
@@ -26,10 +25,10 @@ public class ProducerPE extends ProcessingElement {
         this.targetStreams = targetStreams;
     }
 
-    public void onTime() {
-        if (tick < 1000) {
+    public void sendMessages() {
+        for (long tick = 1; tick <= 100000; tick++) {
             Event event = new Event();
-            event.put("tick", Long.class, tick++);
+            event.put("tick", Long.class, tick);
 
             logger.trace("Sending event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
             for (int i = 0; i < targetStreams.length; i++) {