You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 23:26:06 UTC

apex-malhar git commit: APEXMALHAR-2205 #resolve #comment State management benchmark

Repository: apex-malhar
Updated Branches:
  refs/heads/master f006ac6f5 -> c5a12e4e7


APEXMALHAR-2205 #resolve #comment State management benchmark


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5a12e4e
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5a12e4e
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5a12e4e

Branch: refs/heads/master
Commit: c5a12e4e747c5be840a16e4c932cbc1dbff79894
Parents: f006ac6
Author: brightchen <br...@datatorrent.com>
Authored: Fri Aug 26 16:09:12 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Thu Sep 1 15:00:29 2016 -0700

----------------------------------------------------------------------
 benchmark/pom.xml                               |   5 +
 .../state/ManagedStateBenchmarkApp.java         | 215 +++++++++++++++++++
 .../benchmark/state/StoreOperator.java          | 127 +++++++++++
 .../state/ManagedStateBenchmarkAppTester.java   |  70 ++++++
 4 files changed, 417 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index f09ae81..d5451b9 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -595,6 +595,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.9.4</version>
+    </dependency> 
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
new file mode 100644
index 0000000..25e3971
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name = "ManagedStateBenchmark")
+public class ManagedStateBenchmarkApp implements StreamingApplication
+{
+  private static final Logger logger = LoggerFactory.getLogger(ManagedStateBenchmarkApp.class);
+
+  protected static final String PROP_STORE_PATH = "dt.application.ManagedStateBenchmark.storeBasePath";
+  protected static final String DEFAULT_BASE_PATH = "ManagedStateBenchmark/Store";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TestStatsListener sl = new TestStatsListener();
+    sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
+    TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
+    dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+
+    StoreOperator storeOperator = new StoreOperator();
+    storeOperator.setStore(createStore(conf));
+    StoreOperator store = dag.addOperator("Store", storeOperator);
+
+    dag.setAttribute(store, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
+
+    dag.addStream("Events", gen.data, store.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  public ManagedStateImpl createStore(Configuration conf)
+  {
+    String basePath = getStoreBasePath(conf);
+    ManagedStateImpl store = new ManagedStateImpl();
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+    return store;
+  }
+
+  public String getStoreBasePath(Configuration conf)
+  {
+
+    String basePath = conf.get(PROP_STORE_PATH);
+    if (basePath == null || basePath.isEmpty()) {
+      basePath = DEFAULT_BASE_PATH;
+    }
+    return basePath;
+  }
+
+  public static class TestGenerator extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
+    int emitBatchSize = 1000;
+    byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
+    int rate = 20000;
+    int emitCount = 0;
+    private final Random random = new Random();
+    private int range = 1000 * 60; // one minute range of hot keys
+
+    public int getEmitBatchSize()
+    {
+      return emitBatchSize;
+    }
+
+    public void setEmitBatchSize(int emitBatchSize)
+    {
+      this.emitBatchSize = emitBatchSize;
+    }
+
+    public int getRate()
+    {
+      return rate;
+    }
+
+    public void setRate(int rate)
+    {
+      this.rate = rate;
+    }
+
+    public int getRange()
+    {
+      return range;
+    }
+
+    public void setRange(int range)
+    {
+      this.range = range;
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      emitCount = 0;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      long timestamp = System.currentTimeMillis();
+      for (int i = 0; i < emitBatchSize && emitCount < rate; i++) {
+        byte[] key = ByteBuffer.allocate(16).putLong((timestamp - timestamp % range) + random.nextInt(range)).putLong(i)
+            .array();
+        data.emit(new KeyValPair<byte[], byte[]>(key, val));
+        emitCount++;
+      }
+    }
+  }
+
+  public static class TestStatsListener implements StatsListener, Serializable
+  {
+    private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class);
+    private static final long serialVersionUID = 1L;
+    SetPropertyRequest cmd = new SetPropertyRequest();
+
+    long uwId;
+    long dwId;
+    long resumewid;
+    int rate;
+    int queueSize;
+    boolean adjustRate;
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      if (!stats.getLastWindowedStats().isEmpty()) {
+        OperatorStats os = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+        if (os.inputPorts != null && !os.inputPorts.isEmpty()) {
+          dwId = os.windowId;
+          queueSize = os.inputPorts.get(0).queueSize;
+          if (uwId - dwId < 5) {
+            // keep operator busy
+            rate = Math.max(1000, rate);
+            rate += rate / 10;
+          } else if (uwId - dwId > 20) {
+            // operator is behind
+            if (resumewid < dwId) {
+              resumewid = uwId - 15;
+              rate -= rate / 10;
+            }
+          }
+        } else {
+          LOG.debug("uwid-dwid {} skip {} rate {}, queueSize {}", uwId - dwId, resumewid - dwId, rate, queueSize);
+          // upstream operator
+          uwId = os.windowId;
+          if (adjustRate) {
+            Response rsp = new Response();
+            cmd.rate = resumewid < dwId ? rate : 0;
+            rsp.operatorRequests = Lists.newArrayList(cmd);
+            return rsp;
+          }
+        }
+      }
+      return null;
+    }
+
+    public static class SetPropertyRequest implements OperatorRequest, Serializable
+    {
+      private static final long serialVersionUID = 1L;
+      int rate;
+
+      @Override
+      public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException
+      {
+        if (oper instanceof TestGenerator) {
+          LOG.debug("Setting rate to {}", rate);
+          ((TestGenerator)oper).rate = rate;
+        }
+        return null;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
new file mode 100644
index 0000000..0d9c42b
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.Slice;
+
+public class StoreOperator extends BaseOperator implements Operator.CheckpointNotificationListener
+{
+  private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class);
+
+  protected static final int numOfWindowPerStatistics = 10;
+
+  protected ManagedStateImpl store;
+  protected long bucketId = 1;
+
+  protected long lastCheckPointWindowId = -1;
+  protected long currentWindowId;
+  protected long tupleCount = 0;
+  protected int windowCountPerStatistics = 0;
+  protected long statisticsBeginTime = 0;
+
+  public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
+  {
+    @Override
+    public void process(KeyValPair<byte[], byte[]> tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    store.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    store.beginWindow(windowId);
+    if (statisticsBeginTime <= 0) {
+      statisticsBeginTime = System.currentTimeMillis();
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    store.endWindow();
+    if (++windowCountPerStatistics >= numOfWindowPerStatistics) {
+      logStatistics();
+      windowCountPerStatistics = 0;
+    }
+  }
+
+  protected void processTuple(KeyValPair<byte[], byte[]> tuple)
+  {
+    Slice key = new Slice(tuple.getKey());
+    Slice value = new Slice(tuple.getValue());
+    store.put(bucketId, key, value);
+    ++tupleCount;
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    store.committed(windowId);
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    store.beforeCheckpoint(windowId);
+    logger.info("beforeCheckpoint {}", windowId);
+  }
+
+  public ManagedStateImpl getStore()
+  {
+    return store;
+  }
+
+  public void setStore(ManagedStateImpl store)
+  {
+    this.store = store;
+  }
+
+  protected void logStatistics()
+  {
+    long spentTime = System.currentTimeMillis() - statisticsBeginTime;
+    logger.info("Time Spent: {}, Processed tuples: {}, rate: {}", spentTime, tupleCount, tupleCount / spentTime);
+
+    statisticsBeginTime = System.currentTimeMillis();
+    tupleCount = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5a12e4e/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
new file mode 100644
index 0000000..ca5e245
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * This is not a really unit test, but in fact a benchmark runner.
+ * Provides this class to give developers the convenience to run in local IDE environment.
+ *
+ */
+public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
+{
+  public static final String basePath = "target/temp";
+
+  @Test
+  public void test() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(300000);
+
+    lc.shutdown();
+  }
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}