You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/29 22:15:44 UTC

incubator-gobblin git commit: [GOBBLIN-325] Add a Source and Extractor for stress testing

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5d343e318 -> 55689d284


[GOBBLIN-325] Add a Source and Extractor for stress testing

* Configurable sleep time per record
* Configurable compute time per record
* Run duration or record count limit per extractor

Closes #2177 from htran1/stress_test_extractor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55689d28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55689d28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55689d28

Branch: refs/heads/master
Commit: 55689d284dacd6a60c019d1f52fd208ea0746537
Parents: 5d343e3
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Nov 29 14:15:32 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Nov 29 14:15:32 2017 -0800

----------------------------------------------------------------------
 .../gobblin/util/test/StressTestingSource.java  | 181 +++++++++++++++++++
 .../util/test/TestStressTestingSource.java      | 168 +++++++++++++++++
 2 files changed, 349 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55689d28/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
new file mode 100644
index 0000000..5d70219
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.Extract.TableType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * A {@link Source} to be used for stress testing
+ *
+ * This source uses an extractor that can be configured to have sleep and computation time before returning a record.
+ * The size of the returned record can also be configured.
+ */
+public class StressTestingSource implements Source<String, byte[]> {
+  public static final String CONFIG_NAMESPACE = "stressTest";
+  public static final String NUM_WORK_UNITS_KEY = CONFIG_NAMESPACE + "." + "numWorkUnits";
+  public static final int DEFAULT_NUM_WORK_UNITS = 1;
+  public static final String RUN_DURATION_KEY = CONFIG_NAMESPACE + "." + "runDurationSecs";
+  public static final int DEFAULT_RUN_DURATION = 0;
+  public static final String COMPUTE_TIME_MICRO_KEY = CONFIG_NAMESPACE + "." + "computeTimeMicro";
+  public static final int DEFAULT_COMPUTE_TIME_MICRO = 0;
+  public static final String SLEEP_TIME_MICRO_KEY = CONFIG_NAMESPACE + "." + "sleepTimeMicro";
+  public static final int DEFAULT_SLEEP_TIME = 0;
+  public static final String NUM_RECORDS_KEY = CONFIG_NAMESPACE + "." + "numRecords";
+  public static final int DEFAULT_NUM_RECORDS = 1;
+  public static final String MEM_ALLOC_BYTES_KEY = CONFIG_NAMESPACE + "." + "memAllocBytes";
+  public static final int DEFAULT_MEM_ALLOC_BYTES = 8;
+
+  private static final long INVALID_TIME = -1;
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    int numWorkUnits = state.getPropAsInt(NUM_WORK_UNITS_KEY, DEFAULT_NUM_WORK_UNITS);
+
+    Extract extract = new Extract(TableType.APPEND_ONLY,
+        StressTestingSource.class.getPackage().getName(),
+        StressTestingSource.class.getSimpleName());
+
+    List<WorkUnit> wus = new ArrayList<>(numWorkUnits);
+
+    for (int i = 1; i <= numWorkUnits; ++i) {
+      WorkUnit wu = new WorkUnit(extract);
+      wus.add(wu);
+    }
+
+    return wus;
+  }
+
+  @Override
+  public Extractor<String, byte[]> getExtractor(WorkUnitState state) {
+    return new ExtractorImpl(state);
+  }
+
+  @Override
+  public void shutdown(SourceState state) {
+    // Nothing to do
+  }
+
+  public static class ExtractorImpl implements Extractor<String, byte[]> {
+    private int recordsEmitted = 0;
+    private final long startTime;
+    private final long endTime;
+    private final int computeTimeNano;
+    private final int sleepTimeMicro;
+    private final int numRecords;
+    private final int memAllocBytes;
+    private final Random random;
+
+    public ExtractorImpl(WorkUnitState state) {
+      this.random = new Random();
+      this.startTime = System.currentTimeMillis();
+
+      int runDuration = state.getPropAsInt(RUN_DURATION_KEY, DEFAULT_RUN_DURATION);
+
+      // set the end time based on the configured duration
+      if (runDuration > 0) {
+        this.endTime = this.startTime + runDuration * 1000;
+      } else {
+        this.endTime = INVALID_TIME;
+      }
+
+      this.computeTimeNano = state.getPropAsInt(COMPUTE_TIME_MICRO_KEY, DEFAULT_COMPUTE_TIME_MICRO) * 1000;
+      this.sleepTimeMicro = state.getPropAsInt(SLEEP_TIME_MICRO_KEY, DEFAULT_SLEEP_TIME);
+      // num records only takes effect if the duration is not specified
+      this.numRecords = this.endTime == INVALID_TIME ? state.getPropAsInt(NUM_RECORDS_KEY, DEFAULT_NUM_RECORDS) : 0;
+      this.memAllocBytes = state.getPropAsInt(MEM_ALLOC_BYTES_KEY, DEFAULT_MEM_ALLOC_BYTES);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to do
+    }
+
+    @Override
+    public String getSchema() throws IOException {
+      return "string";
+    }
+
+    /**
+     * Read a record with configurable idle and compute time.
+     **/
+    @Override
+    public byte[] readRecord(byte[] reuse) throws DataRecordException, IOException {
+
+      // If an end time is configured then it is used as the stopping point otherwise the record count limit is used
+      if ((this.endTime != INVALID_TIME && System.currentTimeMillis() > this.endTime) ||
+          (this.numRecords > 0 && this.recordsEmitted >= this.numRecords)) {
+        return null;
+      }
+
+      // spend time computing
+      if (this.computeTimeNano > 0) {
+        final long startComputeNanoTime = System.nanoTime();
+        final byte[] bytes = new byte[100];
+
+        while (System.nanoTime() - startComputeNanoTime < this.computeTimeNano) {
+          random.nextBytes(bytes);
+        }
+      }
+
+      // sleep
+      if (this.sleepTimeMicro > 0) {
+        try {
+          TimeUnit.MICROSECONDS.sleep(this.sleepTimeMicro);
+        } catch (InterruptedException e) {
+        }
+      }
+
+      this.recordsEmitted++;
+
+      return newMessage(this.memAllocBytes);
+    }
+
+    @Override public long getExpectedRecordCount() {
+      return this.numRecords;
+    }
+
+    @Override public long getHighWatermark() {
+      return 0;
+    }
+
+    /**
+     * Create a message of numBytes size.
+     * @param numBytes number of bytes to allocate for the message
+     */
+    private byte[] newMessage(int numBytes) {
+      byte[] stringBytes = String.valueOf(this.recordsEmitted).getBytes(Charsets.UTF_8);
+
+      return Arrays.copyOf(stringBytes, numBytes);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55689d28/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
new file mode 100644
index 0000000..aad8a3a
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * Unit tests for {@link StressTestingSource}
+ */
+public class TestStressTestingSource {
+
+  @Test
+  public void testSourceExtractor() throws DataRecordException, IOException {
+    final int MEM_ALLOC_BYTES = 100;
+    final int NUM_WORK_UNITS = 10;
+    final int COMPUTE_TIME_MICRO = 10;
+    final int NUM_RECORDS = 10000;
+
+
+    SourceState state = new SourceState();
+    state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+    state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+    state.setProp(StressTestingSource.COMPUTE_TIME_MICRO_KEY, COMPUTE_TIME_MICRO);
+    state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+
+    StressTestingSource source = new StressTestingSource();
+
+    List<WorkUnit> wus = source.getWorkunits(state);
+    Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+    for (int i = 0; i < wus.size(); ++i) {
+      WorkUnit wu = wus.get(i);
+      WorkUnitState wuState = new WorkUnitState(wu, state);
+      Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+      Assert.assertEquals(extractor.getExpectedRecordCount(), NUM_RECORDS);
+      Assert.assertEquals(extractor.readRecord(null).length, 100);
+    }
+  }
+
+  @Test
+  public void testComputeTime() throws DataRecordException, IOException {
+    final int MEM_ALLOC_BYTES = 100;
+    final int NUM_WORK_UNITS = 1;
+    final int COMPUTE_TIME_MICRO = 10000;
+    final int NUM_RECORDS = 500;
+
+    SourceState state = new SourceState();
+    state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+    state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+    state.setProp(StressTestingSource.COMPUTE_TIME_MICRO_KEY, COMPUTE_TIME_MICRO);
+    state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+
+    StressTestingSource source = new StressTestingSource();
+
+    List<WorkUnit> wus = source.getWorkunits(state);
+    Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+    WorkUnit wu = wus.get(0);
+    WorkUnitState wuState = new WorkUnitState(wu, state);
+    Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+    byte[] record;
+    long startTimeNano = System.nanoTime();
+    while ((record = extractor.readRecord(null)) != null) {
+      Assert.assertEquals(record.length, 100);
+    }
+    long endTimeNano = System.nanoTime();
+
+    long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
+    // check that there is less than 2 second difference between expected and actual time spent
+    Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * NUM_RECORDS)) < (2000000));
+  }
+
+  @Test
+  public void testSleepTime() throws DataRecordException, IOException {
+    final int MEM_ALLOC_BYTES = 100;
+    final int NUM_WORK_UNITS = 1;
+    final int SLEEP_TIME_MICRO = 10000;
+    final int NUM_RECORDS = 500;
+
+    SourceState state = new SourceState();
+    state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+    state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+    state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+    state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+
+    StressTestingSource source = new StressTestingSource();
+
+    List<WorkUnit> wus = source.getWorkunits(state);
+    Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+    WorkUnit wu = wus.get(0);
+    WorkUnitState wuState = new WorkUnitState(wu, state);
+    Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+    byte[] record;
+    long startTimeNano = System.nanoTime();
+    while ((record = extractor.readRecord(null)) != null) {
+      Assert.assertEquals(record.length, 100);
+    }
+    long endTimeNano = System.nanoTime();
+
+    long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
+    // check that there is less than 2 second difference between expected and actual time spent
+    Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * NUM_RECORDS)) < (2000000));
+  }
+
+  @Test
+  public void testRunDuration() throws DataRecordException, IOException {
+    final int MEM_ALLOC_BYTES = 100;
+    final int NUM_WORK_UNITS = 1;
+    final int SLEEP_TIME_MICRO = 1000;
+    final int NUM_RECORDS = 30; // this config is ignored since the duration is set
+    final int RUN_DURATION_SECS = 5;
+
+    SourceState state = new SourceState();
+    state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+    state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+    state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+    state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+    state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS);
+
+    StressTestingSource source = new StressTestingSource();
+
+    List<WorkUnit> wus = source.getWorkunits(state);
+    Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+    WorkUnit wu = wus.get(0);
+    WorkUnitState wuState = new WorkUnitState(wu, state);
+    Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+    byte[] record;
+    long startTimeNano = System.nanoTime();
+    while ((record = extractor.readRecord(null)) != null) {
+      Assert.assertEquals(record.length, 100);
+    }
+    long endTimeNano = System.nanoTime();
+
+    long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
+    // check that there is less than 1 second difference between expected and actual time spent
+    Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000));
+  }
+}