You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/13 17:31:04 UTC
git commit: Removed timer in producer for producer/consumer test :
messages now sent at maximum rate - highlights issues in tcp emitter
Updated Branches:
refs/heads/S4-75 [created] 384e2c3d6
Removed timer in producer for producer/consumer test : messages now sent at maximum rate
- highlights issues in tcp emitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/384e2c3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/384e2c3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/384e2c3d
Branch: refs/heads/S4-75
Commit: 384e2c3d602eb898ddf3a9e9626650c81977ace8
Parents: 4a30483
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jul 13 19:21:36 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jul 13 19:28:42 2012 +0200
----------------------------------------------------------------------
.../s4/deploy/prodcon/TestProducerConsumer.java | 4 ++--
.../src/main/java/s4app/ConsumerPE.java | 8 ++++----
.../src/main/java/s4app/ProducerApp.java | 5 +----
.../src/main/java/s4app/ProducerPE.java | 7 +++----
4 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index df9291a..2e05b7a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -121,7 +121,7 @@ public class TestProducerConsumer {
initializeS4Node();
CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
+ CommTestUtils.watchAndSignalCreation("/AllTicksReceived", signalConsumptionComplete,
CommTestUtils.createZkClient());
boolean consumerStreamReady = true;
@@ -155,7 +155,7 @@ public class TestProducerConsumer {
zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
// that may be a bit long to complete...
- Assert.assertTrue(signalConsumptionComplete.await(100, TimeUnit.SECONDS));
+ Assert.assertTrue(signalConsumptionComplete.await(30, TimeUnit.SECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
index 1cc7ed0..bba8559 100644
--- a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
@@ -19,14 +19,14 @@ public class ConsumerPE extends ProcessingElement {
public void onEvent(Event event) {
eventCount++;
- logger.trace(
+ logger.info(
"Received event with tick {} and time {} for event # {}",
new String[] { String.valueOf(event.get("tick", Long.class)), String.valueOf(event.getTime()),
String.valueOf(eventCount) });
- if (eventCount == 1000) {
- logger.info("Just reached 1000 events");
+ if (eventCount == 100000) {
+ logger.info("Just reached 100000 events");
ZkClient zkClient = new ZkClient("localhost:2181");
- zkClient.create("/1000TicksReceived", new byte[0], CreateMode.PERSISTENT);
+ zkClient.create("/AllTicksReceived", new byte[0], CreateMode.PERSISTENT);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
----------------------------------------------------------------------
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
index 4b54112..66ed4dc 100644
--- a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
@@ -1,7 +1,5 @@
package s4app;
-import java.util.concurrent.TimeUnit;
-
import org.apache.s4.core.App;
public class ProducerApp extends App {
@@ -11,7 +9,7 @@ public class ProducerApp extends App {
@Override
protected void onStart() {
System.out.println("Starting CounterApp...");
- producerPE.getInstanceForKey("single");
+ ((ProducerPE) producerPE.getInstanceForKey("single")).sendMessages();
}
// generic array due to varargs generates a warning.
@@ -21,7 +19,6 @@ public class ProducerApp extends App {
producerPE = createPE(ProducerPE.class, "producer");
producerPE.setStreams(createOutputStream("tickStream"));
- producerPE.setTimerInterval(10, TimeUnit.MILLISECONDS);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/384e2c3d/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerPE.java b/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
index 57cc7f3..c8a7597 100644
--- a/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerPE.java
@@ -12,7 +12,6 @@ public class ProducerPE extends ProcessingElement {
private static final Logger logger = LoggerFactory.getLogger(ProducerPE.class);
private Streamable[] targetStreams;
- private long tick = 0;
public ProducerPE(App app) {
super(app);
@@ -26,10 +25,10 @@ public class ProducerPE extends ProcessingElement {
this.targetStreams = targetStreams;
}
- public void onTime() {
- if (tick < 1000) {
+ public void sendMessages() {
+ for (long tick = 1; tick <= 100000; tick++) {
Event event = new Event();
- event.put("tick", Long.class, tick++);
+ event.put("tick", Long.class, tick);
logger.trace("Sending event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
for (int i = 0; i < targetStreams.length; i++) {