You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/07 01:42:09 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2339 #resolve #comment
Windowed Operator benchmarking
Repository: apex-malhar
Updated Branches:
refs/heads/master bb3dca1b4 -> 91767c589
APEXMALHAR-2339 #resolve #comment Windowed Operator benchmarking
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7e22686b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7e22686b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7e22686b
Branch: refs/heads/master
Commit: 7e22686b1041ad537e633a380db4ee9d7459435f
Parents: 70154f6
Author: brightchen <br...@datatorrent.com>
Authored: Wed Nov 16 11:25:26 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Jan 4 10:42:10 2017 -0800
----------------------------------------------------------------------
.../benchmark/window/AbstractGenerator.java | 86 ++++++++
.../AbstractWindowedOperatorBenchmarkApp.java | 215 +++++++++++++++++++
.../KeyedWindowedOperatorBenchmarkApp.java | 138 ++++++++++++
.../window/WindowedOperatorBenchmarkApp.java | 123 +++++++++++
.../KeyedWindowedOperatorBenchmarkAppTest.java | 75 +++++++
.../WindowedOperatorBenchmarkAppTest.java | 75 +++++++
benchmark/src/test/resources/log4j.properties | 2 +-
.../apex/malhar/lib/state/managed/Bucket.java | 14 +-
8 files changed, 722 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java
new file mode 100644
index 0000000..c5b1594
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public abstract class AbstractGenerator<T> extends BaseOperator implements InputOperator
+{
+ public final transient DefaultOutputPort<T> data = new DefaultOutputPort<T>();
+
+ protected int emitBatchSize = 1000;
+ protected byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
+ protected int rate = 20000;
+ protected int emitCount = 0;
+ protected final Random random = new Random();
+ protected int range = 1000 * 60; // one minute range of hot keys
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ emitCount = 0;
+ }
+
+ public int getEmitBatchSize()
+ {
+ return emitBatchSize;
+ }
+
+ public void setEmitBatchSize(int emitBatchSize)
+ {
+ this.emitBatchSize = emitBatchSize;
+ }
+
+ public int getRate()
+ {
+ return rate;
+ }
+
+ public void setRate(int rate)
+ {
+ this.rate = rate;
+ }
+
+ public int getRange()
+ {
+ return range;
+ }
+
+ public void setRange(int range)
+ {
+ this.range = range;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ for (int i = 0; i < emitBatchSize && emitCount < rate; i++) {
+ data.emit(generateNextTuple());
+ emitCount++;
+ }
+ }
+
+ protected abstract T generateNextTuple();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
new file mode 100644
index 0000000..09f7653
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGenerator;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.stream.DevNull;
+
+public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> implements StreamingApplication
+{
+ protected static final String PROP_STORE_PATH = "dt.application.WindowedOperatorBenchmark.storeBasePath";
+ protected static final String DEFAULT_BASE_PATH = "WindowedOperatorBenchmark/Store";
+ protected static final int ALLOWED_LATENESS = 19000;
+
+ protected int timeRange = 1000 * 60; // one minute range of hot keys
+
+ protected Class<G> generatorClass;
+ protected Class<O> windowedOperatorClass;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TestStatsListener sl = new TestStatsListener();
+ sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
+
+ G generator = createGenerator();
+ dag.addOperator("Generator", generator);
+ //generator.setRange(timeRange);
+ dag.setAttribute(generator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+
+ O windowedOperator = createWindowedOperator(conf);
+ dag.addOperator("windowedOperator", windowedOperator);
+ dag.setAttribute(windowedOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+ //dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+ connectGeneratorToWindowedOperator(dag, generator, windowedOperator);
+
+// WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
+// dag.addOperator("WatermarkGenerator", watermarkGenerator);
+// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL);
+
+ DevNull output = dag.addOperator("output", new DevNull());
+ dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+ protected G createGenerator()
+ {
+ try {
+ return generatorClass.newInstance();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected O createWindowedOperator(Configuration conf)
+ {
+ SpillableStateStore store = createStore(conf);
+ try {
+ O windowedOperator = this.windowedOperatorClass.newInstance();
+ SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store);
+ windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
+
+ windowedOperator.setDataStorage(createDataStorage(sccImpl));
+ windowedOperator.setRetractionStorage(createRetractionStorage(sccImpl));
+ windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage());
+ windowedOperator.setAccumulation(createAccumulation());
+
+ windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
+ windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
+ //accumulating mode
+ windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
+ windowedOperator.setFixedWatermark(30000);
+ //windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
+
+ return windowedOperator;
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected abstract WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl);
+
+ protected abstract WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl);
+
+ protected abstract Accumulation createAccumulation();
+
+ protected abstract void connectGeneratorToWindowedOperator(DAG dag, G generator, O windowedOperator);
+
+ protected SpillableStateStore createStore(Configuration conf)
+ {
+ String basePath = getStoreBasePath(conf);
+ ManagedTimeUnifiedStateSpillableStateStore store = new ManagedTimeUnifiedStateSpillableStateStore();
+ store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000));
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+
+ return store;
+ }
+
+ public String getStoreBasePath(Configuration conf)
+ {
+ String basePath = conf.get(PROP_STORE_PATH);
+ if (basePath == null || basePath.isEmpty()) {
+ basePath = DEFAULT_BASE_PATH;
+ }
+ return basePath;
+ }
+
+
+ public static class TestStatsListener implements StatsListener, Serializable
+ {
+ private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class);
+ private static final long serialVersionUID = 1L;
+ SetPropertyRequest cmd = new SetPropertyRequest();
+
+ long uwId;
+ long dwId;
+ long resumewid;
+ int rate;
+ int queueSize;
+ boolean adjustRate;
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ if (!stats.getLastWindowedStats().isEmpty()) {
+ OperatorStats os = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+ if (os.inputPorts != null && !os.inputPorts.isEmpty()) {
+ dwId = os.windowId;
+ queueSize = os.inputPorts.get(0).queueSize;
+ if (uwId - dwId < 5) {
+ // keep operator busy
+ rate = Math.max(1000, rate);
+ rate += rate / 10;
+ } else if (uwId - dwId > 20) {
+ // operator is behind
+ if (resumewid < dwId) {
+ resumewid = uwId - 15;
+ rate -= rate / 10;
+ }
+ }
+ } else {
+ //LOG.debug("uwid-dwid {} skip {} rate {}, queueSize {}", uwId - dwId, resumewid - dwId, rate, queueSize);
+ // upstream operator
+ uwId = os.windowId;
+ if (adjustRate) {
+ Response rsp = new Response();
+ cmd.rate = resumewid < dwId ? rate : 0;
+ rsp.operatorRequests = Lists.newArrayList(cmd);
+ return rsp;
+ }
+ }
+ }
+ return null;
+ }
+
+ public static class SetPropertyRequest implements OperatorRequest, Serializable
+ {
+ private static final long serialVersionUID = 1L;
+ int rate;
+
+ @Override
+ public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException
+ {
+ if (oper instanceof WindowedGenerator) {
+ LOG.debug("Setting rate to {}", rate);
+ ((WindowedGenerator)oper).rate = rate;
+ }
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
new file mode 100644
index 0000000..7b2085d
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.Count;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
+{
+ public KeyedWindowedOperatorBenchmarkApp()
+ {
+ generatorClass = KeyedWindowedGenerator.class;
+ windowedOperatorClass = KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator.class;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void connectGeneratorToWindowedOperator(DAG dag, KeyedWindowedGenerator generator,
+ KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator windowedOperator)
+ {
+ dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+ protected static class MyKeyedWindowedOperator extends KeyedWindowedOperatorImpl
+ {
+ private static final Logger logger = LoggerFactory.getLogger(MyKeyedWindowedOperator.class);
+
+ private long logWindows = 20;
+ private long windowCount = 0;
+ private long beginTime = System.currentTimeMillis();
+ private long tupleCount = 0;
+ private long totalBeginTime = System.currentTimeMillis();
+ private long totalCount = 0;
+
+ private long droppedCount = 0;
+ @Override
+ public void dropTuple(Tuple input)
+ {
+ droppedCount++;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ if (++windowCount == logWindows) {
+ long endTime = System.currentTimeMillis();
+ tupleCount -= droppedCount;
+ totalCount += tupleCount;
+ logger.info("total: count: {}; time: {}; average: {}; period: count: {}; dropped: {}; time: {}; average: {}",
+ totalCount, endTime - totalBeginTime, totalCount * 1000 / (endTime - totalBeginTime),
+ tupleCount, droppedCount, endTime - beginTime, tupleCount * 1000 / (endTime - beginTime));
+ windowCount = 0;
+ beginTime = System.currentTimeMillis();
+ tupleCount = 0;
+ droppedCount = 0;
+ }
+ }
+
+ @Override
+ public void processTuple(Tuple tuple)
+ {
+ super.processTuple(tuple);
+ ++tupleCount;
+ }
+ }
+
+ protected static class KeyedWindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
+ {
+ @Override
+ protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple()
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(System.currentTimeMillis() - random.nextInt(60000),
+ new KeyValPair<String, Long>("" + random.nextInt(100000), (long)random.nextInt(100)));
+ }
+ }
+
+ @Override
+ protected Accumulation createAccumulation()
+ {
+ return new Count();
+ }
+
+ private boolean useInMemoryStorage = false;
+
+ @Override
+ protected WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl)
+ {
+ if (useInMemoryStorage) {
+ return new InMemoryWindowedKeyedStorage();
+ }
+ SpillableWindowedKeyedStorage dataStorage = new SpillableWindowedKeyedStorage();
+ dataStorage.setSpillableComplexComponent(sccImpl);
+ return dataStorage;
+ }
+
+ @Override
+ protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl)
+ {
+ if (useInMemoryStorage) {
+ return new InMemoryWindowedKeyedStorage();
+ }
+ SpillableWindowedKeyedStorage retractionStorage = new SpillableWindowedKeyedStorage();
+ retractionStorage.setSpillableComplexComponent(sccImpl);
+ return retractionStorage;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
new file mode 100644
index 0000000..98275ce
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.Count;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "WindowedOperatorBenchmark")
+public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator>
+{
+ public WindowedOperatorBenchmarkApp()
+ {
+ generatorClass = WindowedGenerator.class;
+ windowedOperatorClass = WindowedOperatorBenchmarkApp.MyWindowedOperator.class;
+ }
+
+
+ protected static class WindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<Long>>
+ {
+ @Override
+ protected TimestampedTuple<Long> generateNextTuple()
+ {
+ return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), (long)random.nextInt(100));
+ }
+ }
+
+ protected static class MyWindowedOperator extends WindowedOperatorImpl
+ {
+ private static final Logger logger = LoggerFactory.getLogger(MyWindowedOperator.class);
+
+ private long logWindows = 20;
+ private long windowCount = 0;
+ private long beginTime = System.currentTimeMillis();
+ private long tupleCount = 0;
+ private long totalBeginTime = System.currentTimeMillis();
+ private long totalCount = 0;
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ if (++windowCount == logWindows) {
+ long endTime = System.currentTimeMillis();
+ totalCount += tupleCount;
+ logger.info("total: count: {}; time: {}; average: {}; period: count: {}; time: {}; average: {}",
+ totalCount, endTime - totalBeginTime, totalCount * 1000 / (endTime - totalBeginTime),
+ tupleCount, endTime - beginTime, tupleCount * 1000 / (endTime - beginTime));
+ windowCount = 0;
+ beginTime = System.currentTimeMillis();
+ tupleCount = 0;
+ }
+ }
+
+ @Override
+ public void processTuple(Tuple tuple)
+ {
+ super.processTuple(tuple);
+ ++tupleCount;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void connectGeneratorToWindowedOperator(DAG dag, WindowedGenerator generator,
+ MyWindowedOperator windowedOperator)
+ {
+ dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+
+ @Override
+ protected Accumulation createAccumulation()
+ {
+ return new Count();
+ }
+
+
+ @Override
+ protected WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl)
+ {
+ SpillableWindowedPlainStorage plainDataStorage = new SpillableWindowedPlainStorage();
+ plainDataStorage.setSpillableComplexComponent(sccImpl);
+ return plainDataStorage;
+ }
+
+
+ @Override
+ protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl)
+ {
+ SpillableWindowedPlainStorage plainRetractionStorage = new SpillableWindowedPlainStorage();
+ plainRetractionStorage.setSpillableComplexComponent(sccImpl);
+ return plainRetractionStorage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java
new file mode 100644
index 0000000..2bc9335
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class KeyedWindowedOperatorBenchmarkAppTest extends KeyedWindowedOperatorBenchmarkApp
+{
+ public static final String basePath = "target/temp";
+
+ @Before
+ public void before()
+ {
+ FileUtil.fullyDelete(new File(basePath));
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(3000000);
+
+ lc.shutdown();
+ }
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java
new file mode 100644
index 0000000..4a16396
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class WindowedOperatorBenchmarkAppTest extends WindowedOperatorBenchmarkApp
+{
+ public static final String basePath = "target/temp";
+
+ @Before
+ public void before()
+ {
+ FileUtil.fullyDelete(new File(basePath));
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(3000000);
+
+ lc.shutdown();
+ }
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index 3fc0120..92e48b7 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -23,7 +23,7 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
+test.log.console.threshold=INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 3c18b2f..4f2cefd 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -302,7 +302,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
if (timeBucket != -1) {
BucketedValue bucketedValue = getValueFromTimeBucketReader(key, timeBucket);
if (bucketedValue != null) {
- if (timeBucket == cachedBucketMetas.firstKey()) {
+ if (!cachedBucketMetas.isEmpty() && timeBucket == cachedBucketMetas.firstKey()) {
//if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
//Since the size of the whole time-bucket is added to total size, there is no need to add the size of
//entries in file cache.
@@ -493,9 +493,11 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
memoryFreed += originSize - (keyStream.size() + valueStream.size());
}
- //release the free memory immediately
- keyStream.releaseAllFreeMemory();
- valueStream.releaseAllFreeMemory();
+ if (memoryFreed > 0) {
+ //release the free memory immediately
+ keyStream.releaseAllFreeMemory();
+ valueStream.releaseAllFreeMemory();
+ }
return memoryFreed;
}
@@ -540,7 +542,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
//so it will be re-written by BucketsDataManager
try {
BucketsFileSystem.TimeBucketMeta tbm = cachedBucketMetas.get(bucketedValue.getTimeBucket());
- memoryFreed += tbm.getSizeInBytes();
+ if (tbm != null) {
+ memoryFreed += tbm.getSizeInBytes();
+ }
LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
reader.close();
} catch (IOException e) {
[2/2] apex-malhar git commit: Merge commit 'refs/pull/502/head' of
github.com:apache/apex-malhar
Posted by da...@apache.org.
Merge commit 'refs/pull/502/head' of github.com:apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/91767c58
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/91767c58
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/91767c58
Branch: refs/heads/master
Commit: 91767c5891929b00577f7d9596ec427df1eb881d
Parents: bb3dca1 7e22686
Author: David Yan <da...@apache.org>
Authored: Fri Jan 6 15:42:09 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Fri Jan 6 15:42:09 2017 -0800
----------------------------------------------------------------------
.../benchmark/window/AbstractGenerator.java | 86 ++++++++
.../AbstractWindowedOperatorBenchmarkApp.java | 215 +++++++++++++++++++
.../KeyedWindowedOperatorBenchmarkApp.java | 138 ++++++++++++
.../window/WindowedOperatorBenchmarkApp.java | 123 +++++++++++
.../KeyedWindowedOperatorBenchmarkAppTest.java | 75 +++++++
.../WindowedOperatorBenchmarkAppTest.java | 75 +++++++
benchmark/src/test/resources/log4j.properties | 2 +-
.../apex/malhar/lib/state/managed/Bucket.java | 14 +-
8 files changed, 722 insertions(+), 6 deletions(-)
----------------------------------------------------------------------