You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/07 01:42:09 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2339 #resolve #comment Windowed Operator benchmarking

Repository: apex-malhar
Updated Branches:
  refs/heads/master bb3dca1b4 -> 91767c589


APEXMALHAR-2339 #resolve #comment Windowed Operator benchmarking


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7e22686b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7e22686b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7e22686b

Branch: refs/heads/master
Commit: 7e22686b1041ad537e633a380db4ee9d7459435f
Parents: 70154f6
Author: brightchen <br...@datatorrent.com>
Authored: Wed Nov 16 11:25:26 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Wed Jan 4 10:42:10 2017 -0800

----------------------------------------------------------------------
 .../benchmark/window/AbstractGenerator.java     |  86 ++++++++
 .../AbstractWindowedOperatorBenchmarkApp.java   | 215 +++++++++++++++++++
 .../KeyedWindowedOperatorBenchmarkApp.java      | 138 ++++++++++++
 .../window/WindowedOperatorBenchmarkApp.java    | 123 +++++++++++
 .../KeyedWindowedOperatorBenchmarkAppTest.java  |  75 +++++++
 .../WindowedOperatorBenchmarkAppTest.java       |  75 +++++++
 benchmark/src/test/resources/log4j.properties   |   2 +-
 .../apex/malhar/lib/state/managed/Bucket.java   |  14 +-
 8 files changed, 722 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java
new file mode 100644
index 0000000..c5b1594
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractGenerator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public abstract class AbstractGenerator<T> extends BaseOperator implements InputOperator
+{
+  public final transient DefaultOutputPort<T> data = new DefaultOutputPort<T>();
+
+  protected int emitBatchSize = 1000;
+  protected byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
+  protected int rate = 20000;
+  protected int emitCount = 0;
+  protected final Random random = new Random();
+  protected int range = 1000 * 60; // one minute range of hot keys
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    emitCount = 0;
+  }
+
+  public int getEmitBatchSize()
+  {
+    return emitBatchSize;
+  }
+
+  public void setEmitBatchSize(int emitBatchSize)
+  {
+    this.emitBatchSize = emitBatchSize;
+  }
+
+  public int getRate()
+  {
+    return rate;
+  }
+
+  public void setRate(int rate)
+  {
+    this.rate = rate;
+  }
+
+  public int getRange()
+  {
+    return range;
+  }
+
+  public void setRange(int range)
+  {
+    this.range = range;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    for (int i = 0; i < emitBatchSize && emitCount < rate; i++) {
+      data.emit(generateNextTuple());
+      emitCount++;
+    }
+  }
+
+  protected abstract T generateNextTuple();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
new file mode 100644
index 0000000..09f7653
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGenerator;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.stream.DevNull;
+
+public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> implements StreamingApplication
+{
+  protected static final String PROP_STORE_PATH = "dt.application.WindowedOperatorBenchmark.storeBasePath";
+  protected static final String DEFAULT_BASE_PATH = "WindowedOperatorBenchmark/Store";
+  protected static final int ALLOWED_LATENESS = 19000;
+
+  protected int timeRange = 1000 * 60; // one minute range of hot keys
+
+  protected Class<G> generatorClass;
+  protected Class<O> windowedOperatorClass;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TestStatsListener sl = new TestStatsListener();
+    sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
+
+    G generator = createGenerator();
+    dag.addOperator("Generator", generator);
+    //generator.setRange(timeRange);
+    dag.setAttribute(generator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+
+    O windowedOperator = createWindowedOperator(conf);
+    dag.addOperator("windowedOperator", windowedOperator);
+    dag.setAttribute(windowedOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+    //dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+    connectGeneratorToWindowedOperator(dag, generator, windowedOperator);
+
+//    WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
+//    dag.addOperator("WatermarkGenerator", watermarkGenerator);
+//    dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL);
+
+    DevNull output = dag.addOperator("output", new DevNull());
+    dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  protected G createGenerator()
+  {
+    try {
+      return generatorClass.newInstance();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  protected O createWindowedOperator(Configuration conf)
+  {
+    SpillableStateStore store = createStore(conf);
+    try {
+      O windowedOperator = this.windowedOperatorClass.newInstance();
+      SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store);
+      windowedOperator.addComponent("SpillableComplexComponent", sccImpl);
+
+      windowedOperator.setDataStorage(createDataStorage(sccImpl));
+      windowedOperator.setRetractionStorage(createRetractionStorage(sccImpl));
+      windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage());
+      windowedOperator.setAccumulation(createAccumulation());
+
+      windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
+      windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
+      //accumulating mode
+      windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
+      windowedOperator.setFixedWatermark(30000);
+      //windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
+
+      return windowedOperator;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  protected abstract WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl);
+
+  protected abstract WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl);
+
+  protected abstract Accumulation createAccumulation();
+
+  protected abstract void connectGeneratorToWindowedOperator(DAG dag, G generator, O windowedOperator);
+
+  protected SpillableStateStore createStore(Configuration conf)
+  {
+    String basePath = getStoreBasePath(conf);
+    ManagedTimeUnifiedStateSpillableStateStore store = new ManagedTimeUnifiedStateSpillableStateStore();
+    store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000));
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+
+    return store;
+  }
+
+  public String getStoreBasePath(Configuration conf)
+  {
+    String basePath = conf.get(PROP_STORE_PATH);
+    if (basePath == null || basePath.isEmpty()) {
+      basePath = DEFAULT_BASE_PATH;
+    }
+    return basePath;
+  }
+
+
+  public static class TestStatsListener implements StatsListener, Serializable
+  {
+    private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class);
+    private static final long serialVersionUID = 1L;
+    SetPropertyRequest cmd = new SetPropertyRequest();
+
+    long uwId;
+    long dwId;
+    long resumewid;
+    int rate;
+    int queueSize;
+    boolean adjustRate;
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      if (!stats.getLastWindowedStats().isEmpty()) {
+        OperatorStats os = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+        if (os.inputPorts != null && !os.inputPorts.isEmpty()) {
+          dwId = os.windowId;
+          queueSize = os.inputPorts.get(0).queueSize;
+          if (uwId - dwId < 5) {
+            // keep operator busy
+            rate = Math.max(1000, rate);
+            rate += rate / 10;
+          } else if (uwId - dwId > 20) {
+            // operator is behind
+            if (resumewid < dwId) {
+              resumewid = uwId - 15;
+              rate -= rate / 10;
+            }
+          }
+        } else {
+          //LOG.debug("uwid-dwid {} skip {} rate {}, queueSize {}", uwId - dwId, resumewid - dwId, rate, queueSize);
+          // upstream operator
+          uwId = os.windowId;
+          if (adjustRate) {
+            Response rsp = new Response();
+            cmd.rate = resumewid < dwId ? rate : 0;
+            rsp.operatorRequests = Lists.newArrayList(cmd);
+            return rsp;
+          }
+        }
+      }
+      return null;
+    }
+
+    public static class SetPropertyRequest implements OperatorRequest, Serializable
+    {
+      private static final long serialVersionUID = 1L;
+      int rate;
+
+      @Override
+      public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException
+      {
+        if (oper instanceof WindowedGenerator) {
+          LOG.debug("Setting rate to {}", rate);
+          ((WindowedGenerator)oper).rate = rate;
+        }
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
new file mode 100644
index 0000000..7b2085d
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.Count;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
+{
+  public KeyedWindowedOperatorBenchmarkApp()
+  {
+    generatorClass = KeyedWindowedGenerator.class;
+    windowedOperatorClass = KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator.class;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void connectGeneratorToWindowedOperator(DAG dag, KeyedWindowedGenerator generator,
+      KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator windowedOperator)
+  {
+    dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  protected static class MyKeyedWindowedOperator extends KeyedWindowedOperatorImpl
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MyKeyedWindowedOperator.class);
+
+    private long logWindows = 20;
+    private long windowCount = 0;
+    private long beginTime = System.currentTimeMillis();
+    private long tupleCount = 0;
+    private long totalBeginTime = System.currentTimeMillis();
+    private long totalCount = 0;
+
+    private long droppedCount = 0;
+    @Override
+    public void dropTuple(Tuple input)
+    {
+      droppedCount++;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      if (++windowCount == logWindows) {
+        long endTime = System.currentTimeMillis();
+        tupleCount -= droppedCount;
+        totalCount += tupleCount;
+        logger.info("total: count: {}; time: {}; average: {}; period: count: {}; dropped: {}; time: {}; average: {}",
+            totalCount, endTime - totalBeginTime, totalCount * 1000 / (endTime - totalBeginTime),
+            tupleCount, droppedCount, endTime - beginTime, tupleCount * 1000 / (endTime - beginTime));
+        windowCount = 0;
+        beginTime = System.currentTimeMillis();
+        tupleCount = 0;
+        droppedCount = 0;
+      }
+    }
+
+    @Override
+    public void processTuple(Tuple tuple)
+    {
+      super.processTuple(tuple);
+      ++tupleCount;
+    }
+  }
+
+  protected static class KeyedWindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
+  {
+    @Override
+    protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple()
+    {
+      return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(System.currentTimeMillis() - random.nextInt(60000),
+          new KeyValPair<String, Long>("" + random.nextInt(100000), (long)random.nextInt(100)));
+    }
+  }
+
+  @Override
+  protected Accumulation createAccumulation()
+  {
+    return new Count();
+  }
+
+  private boolean useInMemoryStorage = false;
+
+  @Override
+  protected WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl)
+  {
+    if (useInMemoryStorage) {
+      return new InMemoryWindowedKeyedStorage();
+    }
+    SpillableWindowedKeyedStorage dataStorage = new SpillableWindowedKeyedStorage();
+    dataStorage.setSpillableComplexComponent(sccImpl);
+    return dataStorage;
+  }
+
+  @Override
+  protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl)
+  {
+    if (useInMemoryStorage) {
+      return new InMemoryWindowedKeyedStorage();
+    }
+    SpillableWindowedKeyedStorage retractionStorage = new SpillableWindowedKeyedStorage();
+    retractionStorage.setSpillableComplexComponent(sccImpl);
+    return retractionStorage;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
new file mode 100644
index 0000000..98275ce
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Tuple.TimestampedTuple;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.Count;
+import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "WindowedOperatorBenchmark")
+public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator>
+{
+  public WindowedOperatorBenchmarkApp()
+  {
+    generatorClass = WindowedGenerator.class;
+    windowedOperatorClass = WindowedOperatorBenchmarkApp.MyWindowedOperator.class;
+  }
+
+
+  protected static class WindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<Long>>
+  {
+    @Override
+    protected TimestampedTuple<Long> generateNextTuple()
+    {
+      return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), (long)random.nextInt(100));
+    }
+  }
+
+  protected static class MyWindowedOperator extends WindowedOperatorImpl
+  {
+    private static final Logger logger = LoggerFactory.getLogger(MyWindowedOperator.class);
+
+    private long logWindows = 20;
+    private long windowCount = 0;
+    private long beginTime = System.currentTimeMillis();
+    private long tupleCount = 0;
+    private long totalBeginTime = System.currentTimeMillis();
+    private long totalCount = 0;
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      if (++windowCount == logWindows) {
+        long endTime = System.currentTimeMillis();
+        totalCount += tupleCount;
+        logger.info("total: count: {}; time: {}; average: {}; period: count: {}; time: {}; average: {}",
+            totalCount, endTime - totalBeginTime, totalCount * 1000 / (endTime - totalBeginTime),
+            tupleCount, endTime - beginTime, tupleCount * 1000 / (endTime - beginTime));
+        windowCount = 0;
+        beginTime = System.currentTimeMillis();
+        tupleCount = 0;
+      }
+    }
+
+    @Override
+    public void processTuple(Tuple tuple)
+    {
+      super.processTuple(tuple);
+      ++tupleCount;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void connectGeneratorToWindowedOperator(DAG dag, WindowedGenerator generator,
+      MyWindowedOperator windowedOperator)
+  {
+    dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+
+  @Override
+  protected Accumulation createAccumulation()
+  {
+    return new Count();
+  }
+
+
+  @Override
+  protected WindowedStorage createDataStorage(SpillableComplexComponentImpl sccImpl)
+  {
+    SpillableWindowedPlainStorage plainDataStorage = new SpillableWindowedPlainStorage();
+    plainDataStorage.setSpillableComplexComponent(sccImpl);
+    return plainDataStorage;
+  }
+
+
+  @Override
+  protected WindowedStorage createRetractionStorage(SpillableComplexComponentImpl sccImpl)
+  {
+    SpillableWindowedPlainStorage plainRetractionStorage = new SpillableWindowedPlainStorage();
+    plainRetractionStorage.setSpillableComplexComponent(sccImpl);
+    return plainRetractionStorage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java
new file mode 100644
index 0000000..2bc9335
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkAppTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class KeyedWindowedOperatorBenchmarkAppTest extends KeyedWindowedOperatorBenchmarkApp
+{
+  public static final String basePath = "target/temp";
+
+  @Before
+  public void before()
+  {
+    FileUtil.fullyDelete(new File(basePath));
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(3000000);
+
+    lc.shutdown();
+  }
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java
new file mode 100644
index 0000000..4a16396
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkAppTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.window;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class WindowedOperatorBenchmarkAppTest extends WindowedOperatorBenchmarkApp
+{
+  public static final String basePath = "target/temp";
+
+  @Before
+  public void before()
+  {
+    FileUtil.fullyDelete(new File(basePath));
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(3000000);
+
+    lc.shutdown();
+  }
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index 3fc0120..92e48b7 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -23,7 +23,7 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
 log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
+test.log.console.threshold=INFO
 
 log4j.appender.RFA=org.apache.log4j.RollingFileAppender
 log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7e22686b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 3c18b2f..4f2cefd 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -302,7 +302,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
         if (timeBucket != -1) {
           BucketedValue bucketedValue = getValueFromTimeBucketReader(key, timeBucket);
           if (bucketedValue != null) {
-            if (timeBucket == cachedBucketMetas.firstKey()) {
+            if (!cachedBucketMetas.isEmpty() && timeBucket == cachedBucketMetas.firstKey()) {
               //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
               //Since the size of the whole time-bucket is added to total size, there is no need to add the size of
               //entries in file cache.
@@ -493,9 +493,11 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
         memoryFreed += originSize - (keyStream.size() + valueStream.size());
       }
 
-      //release the free memory immediately
-      keyStream.releaseAllFreeMemory();
-      valueStream.releaseAllFreeMemory();
+      if (memoryFreed > 0) {
+        //release the free memory immediately
+        keyStream.releaseAllFreeMemory();
+        valueStream.releaseAllFreeMemory();
+      }
 
       return memoryFreed;
     }
@@ -540,7 +542,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide
               //so it will be re-written by BucketsDataManager
               try {
                 BucketsFileSystem.TimeBucketMeta tbm = cachedBucketMetas.get(bucketedValue.getTimeBucket());
-                memoryFreed += tbm.getSizeInBytes();
+                if (tbm != null) {
+                  memoryFreed += tbm.getSizeInBytes();
+                }
                 LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
                 reader.close();
               } catch (IOException e) {


[2/2] apex-malhar git commit: Merge commit 'refs/pull/502/head' of github.com:apache/apex-malhar

Posted by da...@apache.org.
Merge commit 'refs/pull/502/head' of github.com:apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/91767c58
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/91767c58
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/91767c58

Branch: refs/heads/master
Commit: 91767c5891929b00577f7d9596ec427df1eb881d
Parents: bb3dca1 7e22686
Author: David Yan <da...@apache.org>
Authored: Fri Jan 6 15:42:09 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Fri Jan 6 15:42:09 2017 -0800

----------------------------------------------------------------------
 .../benchmark/window/AbstractGenerator.java     |  86 ++++++++
 .../AbstractWindowedOperatorBenchmarkApp.java   | 215 +++++++++++++++++++
 .../KeyedWindowedOperatorBenchmarkApp.java      | 138 ++++++++++++
 .../window/WindowedOperatorBenchmarkApp.java    | 123 +++++++++++
 .../KeyedWindowedOperatorBenchmarkAppTest.java  |  75 +++++++
 .../WindowedOperatorBenchmarkAppTest.java       |  75 +++++++
 benchmark/src/test/resources/log4j.properties   |   2 +-
 .../apex/malhar/lib/state/managed/Bucket.java   |  14 +-
 8 files changed, 722 insertions(+), 6 deletions(-)
----------------------------------------------------------------------