You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/10/26 23:54:00 UTC
apex-malhar git commit: APEXMALHAR-2317 #resolve #comment Change
SpillableBenchmarkApp to adapt the change on Spillable Data Structure
Repository: apex-malhar
Updated Branches:
refs/heads/master 5ae58d039 -> ea1b58f72
APEXMALHAR-2317 #resolve #comment Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ea1b58f7
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ea1b58f7
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ea1b58f7
Branch: refs/heads/master
Commit: ea1b58f72e69ce0e1bfd35eb8b4b6756b1a5e0a0
Parents: 5ae58d0
Author: brightchen <br...@datatorrent.com>
Authored: Wed Oct 26 11:28:34 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Oct 26 14:56:35 2016 -0700
----------------------------------------------------------------------
.../spillable/SpillableTestInputOperator.java | 2 +-
.../spillable/SpillableTestOperator.java | 19 +++++++++++++++++++
.../spillable/SpillableBenchmarkAppTester.java | 13 ++++++++++++-
3 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
index 2e33721..c3eafb0 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
@@ -27,7 +27,7 @@ public class SpillableTestInputOperator extends BaseOperator implements InputOpe
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
public long count = 0;
public int batchSize = 100;
- public int sleepBetweenBatch = 1;
+ public int sleepBetweenBatch = 0;
@Override
public void emitTuples()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
index 3c5bf71..7c87b93 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -59,6 +59,9 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
public static Throwable errorTrace;
+ private long lastLogTime;
+ private long beginTime;
+
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
@@ -89,8 +92,12 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
}
store.setup(context);
+ windowToCount.setup(context);
multiMap.setup(context);
+ lastLogTime = System.currentTimeMillis();
+ beginTime = lastLogTime;
+
checkData();
}
@@ -148,9 +155,21 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
if (windowId % 10 == 0) {
checkData();
+ logStatistics();
}
}
+ private long lastTotalCount = 0;
+
+ public void logStatistics()
+ {
+ long countInPeriod = totalCount - lastTotalCount;
+ long timeInPeriod = System.currentTimeMillis() - lastLogTime;
+ long totalTime = System.currentTimeMillis() - beginTime;
+ logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
+ totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
+ }
+
@Override
public void beforeCheckpoint(long windowId)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
index 7f94079..cd2c640 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
@@ -18,12 +18,16 @@
*/
package com.datatorrent.benchmark.spillable;
+import java.io.File;
+
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
@@ -33,6 +37,13 @@ public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
{
private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class);
public static final String basePath = "target/temp";
+
+ @Before
+ public void before()
+ {
+ FileUtil.fullyDelete(new File(basePath));
+ }
+
@Test
public void test() throws Exception
{
@@ -55,7 +66,7 @@ public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
// Create local cluster
final LocalMode.Controller lc = lma.getController();
- lc.run(60000);
+ lc.run(600000);
lc.shutdown();