You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/16 16:29:29 UTC
apex-malhar git commit: APEXMALHAR-2205 #resolve #comment State
management benchmark - add update
Repository: apex-malhar
Updated Branches:
refs/heads/master 9f9da0ee1 -> b6c48bb30
APEXMALHAR-2205 #resolve #comment State management benchmark - add update
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b6c48bb3
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b6c48bb3
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b6c48bb3
Branch: refs/heads/master
Commit: b6c48bb30bc1ba58ddb547c9c6bcad1ef3547a38
Parents: 9f9da0e
Author: brightchen <br...@datatorrent.com>
Authored: Wed Sep 14 15:21:35 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Thu Sep 15 16:31:37 2016 -0700
----------------------------------------------------------------------
.../state/ManagedStateBenchmarkApp.java | 21 +--
.../benchmark/state/StoreOperator.java | 141 ++++++++++++++++++-
.../src/main/resources/META-INF/properties.xml | 5 +
.../state/ManagedStateBenchmarkAppTester.java | 37 ++++-
4 files changed, 186 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index 25e3971..7d9c3ba 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -26,7 +26,7 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Lists;
@@ -48,32 +48,35 @@ import com.datatorrent.lib.util.KeyValPair;
@ApplicationAnnotation(name = "ManagedStateBenchmark")
public class ManagedStateBenchmarkApp implements StreamingApplication
{
- private static final Logger logger = LoggerFactory.getLogger(ManagedStateBenchmarkApp.class);
-
protected static final String PROP_STORE_PATH = "dt.application.ManagedStateBenchmark.storeBasePath";
protected static final String DEFAULT_BASE_PATH = "ManagedStateBenchmark/Store";
+ protected StoreOperator storeOperator;
+ protected int timeRange = 1000 * 60; // one minute range of hot keys
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
TestStatsListener sl = new TestStatsListener();
sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
+ gen.setRange(timeRange);
dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
- StoreOperator storeOperator = new StoreOperator();
+ storeOperator = new StoreOperator();
storeOperator.setStore(createStore(conf));
- StoreOperator store = dag.addOperator("Store", storeOperator);
+ storeOperator.setTimeRange(timeRange);
+ storeOperator = dag.addOperator("Store", storeOperator);
- dag.setAttribute(store, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+ dag.setAttribute(storeOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
- dag.addStream("Events", gen.data, store.input).setLocality(Locality.CONTAINER_LOCAL);
+ dag.addStream("Events", gen.data, storeOperator.input).setLocality(Locality.CONTAINER_LOCAL);
}
- public ManagedStateImpl createStore(Configuration conf)
+ public ManagedTimeUnifiedStateImpl createStore(Configuration conf)
{
String basePath = getStoreBasePath(conf);
- ManagedStateImpl store = new ManagedStateImpl();
+ ManagedTimeUnifiedStateImpl store = new ManagedTimeUnifiedStateImpl();
((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
return store;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 0d9c42b..2530611 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -18,10 +18,18 @@
*/
package com.datatorrent.benchmark.state;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Future;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+
+import com.google.common.collect.Maps;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
@@ -34,9 +42,17 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
{
private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class);
+ public static enum ExecMode
+ {
+ INSERT,
+ UPDATESYNC,
+ UPDATEASYNC
+ }
+
protected static final int numOfWindowPerStatistics = 10;
- protected ManagedStateImpl store;
+ //this is the store we are going to use
+ protected ManagedTimeUnifiedStateImpl store;
protected long bucketId = 1;
protected long lastCheckPointWindowId = -1;
@@ -44,7 +60,10 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
protected long tupleCount = 0;
protected int windowCountPerStatistics = 0;
protected long statisticsBeginTime = 0;
-
+
+ protected ExecMode execMode = ExecMode.INSERT;
+ protected int timeRange = 1000 * 60;
+
public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
{
@Override
@@ -57,6 +76,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
@Override
public void setup(OperatorContext context)
{
+ logger.info("The execute mode is: {}", execMode.name());
store.setup(context);
}
@@ -80,10 +100,83 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
}
}
+ protected transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>();
+ protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair = Maps.newHashMap();
+
+ /**
+ * we verify 3 type of operation
+ * @param tuple
+ */
protected void processTuple(KeyValPair<byte[], byte[]> tuple)
{
+ switch (execMode) {
+ case UPDATEASYNC:
+ //handle it specially
+ updateAsync(tuple);
+ break;
+
+ case UPDATESYNC:
+ store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey()));
+ insertValueToStore(tuple);
+ break;
+
+ default: //insert
+ insertValueToStore(tuple);
+ }
+ }
+
+ protected long getTimeByKey(byte[] key)
+ {
+ long lKey = ByteBuffer.wrap(key).getLong();
+ return lKey - (lKey % timeRange);
+ }
+
+ // give a barrier to avoid used up memory
+ protected final int taskBarrier = 100000;
+
+ /**
+ * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value.
+ * @param tuple
+ */
+ protected void updateAsync(KeyValPair<byte[], byte[]> tuple)
+ {
+ if (taskQueue.size() > taskBarrier) {
+ //slow down to avoid too much task waiting.
+ try {
+
+ logger.info("Queue Size: {}, wait time(milli-seconds): {}", taskQueue.size(), taskQueue.size() / taskBarrier);
+ Thread.sleep(taskQueue.size() / taskBarrier);
+ } catch (Exception e) {
+ //ignore
+ }
+ }
+
+ //send request of get to the state manager and add to the taskQueue and taskToPair.
+ //the reason of an extra taskQueue to make sure the tasks are ordered
+ {
+ Slice key = new Slice(tuple.getKey());
+ Future<Slice> task = store.getAsync(getTimeByKey(tuple.getKey()), key);
+ taskQueue.add(task);
+ taskToPair.put(task, tuple);
+ }
+
+ //handle all the tasks which have finished
+ while (!taskQueue.isEmpty()) {
+ //assume task finished in order.
+ if (!taskQueue.peek().isDone()) {
+ break;
+ }
+
+ Future<Slice> task = taskQueue.poll();
+ insertValueToStore(taskToPair.remove(task));
+ }
+ }
+
+ protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple)
+ {
Slice key = new Slice(tuple.getKey());
Slice value = new Slice(tuple.getValue());
+
store.put(bucketId, key, value);
++tupleCount;
}
@@ -106,12 +199,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
logger.info("beforeCheckpoint {}", windowId);
}
- public ManagedStateImpl getStore()
+ public ManagedTimeUnifiedStateImpl getStore()
{
return store;
}
- public void setStore(ManagedStateImpl store)
+ public void setStore(ManagedTimeUnifiedStateImpl store)
{
this.store = store;
}
@@ -119,9 +212,45 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
protected void logStatistics()
{
long spentTime = System.currentTimeMillis() - statisticsBeginTime;
- logger.info("Time Spent: {}, Processed tuples: {}, rate: {}", spentTime, tupleCount, tupleCount / spentTime);
+ logger.info("Time Spent: {}, Processed tuples: {}, rate per second: {}", spentTime, tupleCount, tupleCount * 1000 / spentTime);
statisticsBeginTime = System.currentTimeMillis();
tupleCount = 0;
}
+
+ public ExecMode getExecMode()
+ {
+ return execMode;
+ }
+
+ public void setExecMode(ExecMode execMode)
+ {
+ this.execMode = execMode;
+ }
+
+ public String getExecModeString()
+ {
+ return execMode.name();
+ }
+
+ public void setExeModeStr(String execModeStr)
+ {
+ //this method used for set configuration. so, use case-insensitive
+ for (ExecMode em : ExecMode.values()) {
+ if (em.name().equalsIgnoreCase(execModeStr)) {
+ this.execMode = em;
+ }
+ }
+ }
+
+ public int getTimeRange()
+ {
+ return timeRange;
+ }
+
+ public void setTimeRange(int timeRange)
+ {
+ this.timeRange = timeRange;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/benchmark/src/main/resources/META-INF/properties.xml b/benchmark/src/main/resources/META-INF/properties.xml
index f6d0ffc..b6131e4 100644
--- a/benchmark/src/main/resources/META-INF/properties.xml
+++ b/benchmark/src/main/resources/META-INF/properties.xml
@@ -193,6 +193,11 @@
<name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.passwordMeta</name>
<value></value>
</property>
+ <property>
+ <name>dt.application.ManagedStateBenchmark.operator.Store.execModeStr</name>
+ <!-- valid value are: INSERT, UPDATESYNC, UPDATEASYNC -->
+ <value>INSERT</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
index ca5e245..93a7720 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
@@ -18,13 +18,18 @@
*/
package com.datatorrent.benchmark.state;
+import java.io.File;
+
+import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
/**
* This is not a really unit test, but in fact a benchmark runner.
@@ -34,9 +39,32 @@ import com.datatorrent.api.StreamingApplication;
public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
{
public static final String basePath = "target/temp";
-
+
+ @Before
+ public void before()
+ {
+ FileUtil.fullyDelete(new File(basePath));
+ }
+
+ @Test
+ public void testUpdateSync() throws Exception
+ {
+ test(ExecMode.UPDATESYNC);
+ }
+
@Test
- public void test() throws Exception
+ public void testUpdateAsync() throws Exception
+ {
+ test(ExecMode.UPDATEASYNC);
+ }
+
+ @Test
+ public void testInsert() throws Exception
+ {
+ test(ExecMode.INSERT);
+ }
+
+ public void test(ExecMode exeMode) throws Exception
{
Configuration conf = new Configuration(false);
@@ -44,7 +72,8 @@ public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
DAG dag = lma.getDAG();
super.populateDAG(dag, conf);
-
+ storeOperator.execMode = exeMode;
+
StreamingApplication app = new StreamingApplication()
{
@Override
@@ -62,6 +91,8 @@ public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
lc.shutdown();
}
+
+
@Override
public String getStoreBasePath(Configuration conf)
{