You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/10/26 23:54:00 UTC

apex-malhar git commit: APEXMALHAR-2317 #resolve #comment Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure

Repository: apex-malhar
Updated Branches:
  refs/heads/master 5ae58d039 -> ea1b58f72


APEXMALHAR-2317 #resolve #comment Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ea1b58f7
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ea1b58f7
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ea1b58f7

Branch: refs/heads/master
Commit: ea1b58f72e69ce0e1bfd35eb8b4b6756b1a5e0a0
Parents: 5ae58d0
Author: brightchen <br...@datatorrent.com>
Authored: Wed Oct 26 11:28:34 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Oct 26 14:56:35 2016 -0700

----------------------------------------------------------------------
 .../spillable/SpillableTestInputOperator.java    |  2 +-
 .../spillable/SpillableTestOperator.java         | 19 +++++++++++++++++++
 .../spillable/SpillableBenchmarkAppTester.java   | 13 ++++++++++++-
 3 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
index 2e33721..c3eafb0 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
@@ -27,7 +27,7 @@ public class SpillableTestInputOperator extends BaseOperator implements InputOpe
   public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
   public long count = 0;
   public int batchSize = 100;
-  public int sleepBetweenBatch = 1;
+  public int sleepBetweenBatch = 0;
 
   @Override
   public void emitTuples()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
index 3c5bf71..7c87b93 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -59,6 +59,9 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
 
   public static Throwable errorTrace;
 
+  private long lastLogTime;
+  private long beginTime;
+
   public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
   {
     @Override
@@ -89,8 +92,12 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
     }
 
     store.setup(context);
+    windowToCount.setup(context);
     multiMap.setup(context);
 
+    lastLogTime = System.currentTimeMillis();
+    beginTime = lastLogTime;
+
     checkData();
   }
 
@@ -148,9 +155,21 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
 
     if (windowId % 10 == 0) {
       checkData();
+      logStatistics();
     }
   }
 
+  private long lastTotalCount = 0;
+
+  public void logStatistics()
+  {
+    long countInPeriod = totalCount - lastTotalCount;
+    long timeInPeriod = System.currentTimeMillis() - lastLogTime;
+    long totalTime = System.currentTimeMillis() - beginTime;
+    logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
+        totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
+  }
+
   @Override
   public void beforeCheckpoint(long windowId)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
index 7f94079..cd2c640 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
@@ -18,12 +18,16 @@
  */
 package com.datatorrent.benchmark.spillable;
 
+import java.io.File;
+
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
@@ -33,6 +37,13 @@ public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
 {
   private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class);
   public static final String basePath = "target/temp";
+
+  @Before
+  public void before()
+  {
+    FileUtil.fullyDelete(new File(basePath));
+  }
+
   @Test
   public void test() throws Exception
   {
@@ -55,7 +66,7 @@ public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
 
     // Create local cluster
     final LocalMode.Controller lc = lma.getController();
-    lc.run(60000);
+    lc.run(600000);
 
     lc.shutdown();