You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 23:26:06 UTC
apex-malhar git commit: APEXMALHAR-2205 #resolve #comment State
management benchmark
Repository: apex-malhar
Updated Branches:
refs/heads/master f006ac6f5 -> c5a12e4e7
APEXMALHAR-2205 #resolve #comment State management benchmark
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5a12e4e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5a12e4e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5a12e4e
Branch: refs/heads/master
Commit: c5a12e4e747c5be840a16e4c932cbc1dbff79894
Parents: f006ac6
Author: brightchen <br...@datatorrent.com>
Authored: Fri Aug 26 16:09:12 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Thu Sep 1 15:00:29 2016 -0700
----------------------------------------------------------------------
benchmark/pom.xml | 5 +
.../state/ManagedStateBenchmarkApp.java | 215 +++++++++++++++++++
.../benchmark/state/StoreOperator.java | 127 +++++++++++
.../state/ManagedStateBenchmarkAppTester.java | 70 ++++++
4 files changed, 417 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index f09ae81..d5451b9 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -595,6 +595,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.9.4</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
new file mode 100644
index 0000000..25e3971
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name = "ManagedStateBenchmark")
+public class ManagedStateBenchmarkApp implements StreamingApplication
+{
+ private static final Logger logger = LoggerFactory.getLogger(ManagedStateBenchmarkApp.class);
+
+ protected static final String PROP_STORE_PATH = "dt.application.ManagedStateBenchmark.storeBasePath";
+ protected static final String DEFAULT_BASE_PATH = "ManagedStateBenchmark/Store";
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TestStatsListener sl = new TestStatsListener();
+ sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
+ TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
+ dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+
+ StoreOperator storeOperator = new StoreOperator();
+ storeOperator.setStore(createStore(conf));
+ StoreOperator store = dag.addOperator("Store", storeOperator);
+
+ dag.setAttribute(store, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+
+ dag.addStream("Events", gen.data, store.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+ public ManagedStateImpl createStore(Configuration conf)
+ {
+ String basePath = getStoreBasePath(conf);
+ ManagedStateImpl store = new ManagedStateImpl();
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+ return store;
+ }
+
+ public String getStoreBasePath(Configuration conf)
+ {
+
+ String basePath = conf.get(PROP_STORE_PATH);
+ if (basePath == null || basePath.isEmpty()) {
+ basePath = DEFAULT_BASE_PATH;
+ }
+ return basePath;
+ }
+
+ public static class TestGenerator extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
+ int emitBatchSize = 1000;
+ byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
+ int rate = 20000;
+ int emitCount = 0;
+ private final Random random = new Random();
+ private int range = 1000 * 60; // one minute range of hot keys
+
+ public int getEmitBatchSize()
+ {
+ return emitBatchSize;
+ }
+
+ public void setEmitBatchSize(int emitBatchSize)
+ {
+ this.emitBatchSize = emitBatchSize;
+ }
+
+ public int getRate()
+ {
+ return rate;
+ }
+
+ public void setRate(int rate)
+ {
+ this.rate = rate;
+ }
+
+ public int getRange()
+ {
+ return range;
+ }
+
+ public void setRange(int range)
+ {
+ this.range = range;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ emitCount = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < emitBatchSize && emitCount < rate; i++) {
+ byte[] key = ByteBuffer.allocate(16).putLong((timestamp - timestamp % range) + random.nextInt(range)).putLong(i)
+ .array();
+ data.emit(new KeyValPair<byte[], byte[]>(key, val));
+ emitCount++;
+ }
+ }
+ }
+
+ public static class TestStatsListener implements StatsListener, Serializable
+ {
+ private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class);
+ private static final long serialVersionUID = 1L;
+ SetPropertyRequest cmd = new SetPropertyRequest();
+
+ long uwId;
+ long dwId;
+ long resumewid;
+ int rate;
+ int queueSize;
+ boolean adjustRate;
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ if (!stats.getLastWindowedStats().isEmpty()) {
+ OperatorStats os = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+ if (os.inputPorts != null && !os.inputPorts.isEmpty()) {
+ dwId = os.windowId;
+ queueSize = os.inputPorts.get(0).queueSize;
+ if (uwId - dwId < 5) {
+ // keep operator busy
+ rate = Math.max(1000, rate);
+ rate += rate / 10;
+ } else if (uwId - dwId > 20) {
+ // operator is behind
+ if (resumewid < dwId) {
+ resumewid = uwId - 15;
+ rate -= rate / 10;
+ }
+ }
+ } else {
+ LOG.debug("uwid-dwid {} skip {} rate {}, queueSize {}", uwId - dwId, resumewid - dwId, rate, queueSize);
+ // upstream operator
+ uwId = os.windowId;
+ if (adjustRate) {
+ Response rsp = new Response();
+ cmd.rate = resumewid < dwId ? rate : 0;
+ rsp.operatorRequests = Lists.newArrayList(cmd);
+ return rsp;
+ }
+ }
+ }
+ return null;
+ }
+
+ public static class SetPropertyRequest implements OperatorRequest, Serializable
+ {
+ private static final long serialVersionUID = 1L;
+ int rate;
+
+ @Override
+ public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException
+ {
+ if (oper instanceof TestGenerator) {
+ LOG.debug("Setting rate to {}", rate);
+ ((TestGenerator)oper).rate = rate;
+ }
+ return null;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
new file mode 100644
index 0000000..0d9c42b
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.Slice;
+
+public class StoreOperator extends BaseOperator implements Operator.CheckpointNotificationListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class);
+
+ protected static final int numOfWindowPerStatistics = 10;
+
+ protected ManagedStateImpl store;
+ protected long bucketId = 1;
+
+ protected long lastCheckPointWindowId = -1;
+ protected long currentWindowId;
+ protected long tupleCount = 0;
+ protected int windowCountPerStatistics = 0;
+ protected long statisticsBeginTime = 0;
+
+ public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
+ {
+ @Override
+ public void process(KeyValPair<byte[], byte[]> tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ store.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ store.beginWindow(windowId);
+ if (statisticsBeginTime <= 0) {
+ statisticsBeginTime = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ store.endWindow();
+ if (++windowCountPerStatistics >= numOfWindowPerStatistics) {
+ logStatistics();
+ windowCountPerStatistics = 0;
+ }
+ }
+
+ protected void processTuple(KeyValPair<byte[], byte[]> tuple)
+ {
+ Slice key = new Slice(tuple.getKey());
+ Slice value = new Slice(tuple.getValue());
+ store.put(bucketId, key, value);
+ ++tupleCount;
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ store.committed(windowId);
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ store.beforeCheckpoint(windowId);
+ logger.info("beforeCheckpoint {}", windowId);
+ }
+
+ public ManagedStateImpl getStore()
+ {
+ return store;
+ }
+
+ public void setStore(ManagedStateImpl store)
+ {
+ this.store = store;
+ }
+
+ protected void logStatistics()
+ {
+ long spentTime = System.currentTimeMillis() - statisticsBeginTime;
+ logger.info("Time Spent: {}, Processed tuples: {}, rate: {}", spentTime, tupleCount, tupleCount / spentTime);
+
+ statisticsBeginTime = System.currentTimeMillis();
+ tupleCount = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
new file mode 100644
index 0000000..ca5e245
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * This is not a really unit test, but in fact a benchmark runner.
+ * Provides this class to give developers the convenience to run in local IDE environment.
+ *
+ */
+public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
+{
+ public static final String basePath = "target/temp";
+
+ @Test
+ public void test() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(300000);
+
+ lc.shutdown();
+ }
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}