You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/29 22:15:44 UTC
incubator-gobblin git commit: [GOBBLIN-325] Add a Source and
Extractor for stress testing
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 5d343e318 -> 55689d284
[GOBBLIN-325] Add a Source and Extractor for stress testing
* Configurable sleep time per record
* Configurable compute time per record
* Run duration or record count limit per extractor
Closes #2177 from htran1/stress_test_extractor
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55689d28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55689d28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55689d28
Branch: refs/heads/master
Commit: 55689d284dacd6a60c019d1f52fd208ea0746537
Parents: 5d343e3
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Nov 29 14:15:32 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Nov 29 14:15:32 2017 -0800
----------------------------------------------------------------------
.../gobblin/util/test/StressTestingSource.java | 181 +++++++++++++++++++
.../util/test/TestStressTestingSource.java | 168 +++++++++++++++++
2 files changed, 349 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55689d28/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
new file mode 100644
index 0000000..5d70219
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.Extract.TableType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * A {@link Source} to be used for stress testing
+ *
+ * This source uses an extractor that can be configured to have sleep and computation time before returning a record.
+ * The size of the returned record can also be configured.
+ */
+public class StressTestingSource implements Source<String, byte[]> {
+ public static final String CONFIG_NAMESPACE = "stressTest";
+ public static final String NUM_WORK_UNITS_KEY = CONFIG_NAMESPACE + "." + "numWorkUnits";
+ public static final int DEFAULT_NUM_WORK_UNITS = 1;
+ public static final String RUN_DURATION_KEY = CONFIG_NAMESPACE + "." + "runDurationSecs";
+ public static final int DEFAULT_RUN_DURATION = 0;
+ public static final String COMPUTE_TIME_MICRO_KEY = CONFIG_NAMESPACE + "." + "computeTimeMicro";
+ public static final int DEFAULT_COMPUTE_TIME_MICRO = 0;
+ public static final String SLEEP_TIME_MICRO_KEY = CONFIG_NAMESPACE + "." + "sleepTimeMicro";
+ public static final int DEFAULT_SLEEP_TIME = 0;
+ public static final String NUM_RECORDS_KEY = CONFIG_NAMESPACE + "." + "numRecords";
+ public static final int DEFAULT_NUM_RECORDS = 1;
+ public static final String MEM_ALLOC_BYTES_KEY = CONFIG_NAMESPACE + "." + "memAllocBytes";
+ public static final int DEFAULT_MEM_ALLOC_BYTES = 8;
+
+ private static final long INVALID_TIME = -1;
+
+ @Override
+ public List<WorkUnit> getWorkunits(SourceState state) {
+ int numWorkUnits = state.getPropAsInt(NUM_WORK_UNITS_KEY, DEFAULT_NUM_WORK_UNITS);
+
+ Extract extract = new Extract(TableType.APPEND_ONLY,
+ StressTestingSource.class.getPackage().getName(),
+ StressTestingSource.class.getSimpleName());
+
+ List<WorkUnit> wus = new ArrayList<>(numWorkUnits);
+
+ for (int i = 1; i <= numWorkUnits; ++i) {
+ WorkUnit wu = new WorkUnit(extract);
+ wus.add(wu);
+ }
+
+ return wus;
+ }
+
+ @Override
+ public Extractor<String, byte[]> getExtractor(WorkUnitState state) {
+ return new ExtractorImpl(state);
+ }
+
+ @Override
+ public void shutdown(SourceState state) {
+ // Nothing to do
+ }
+
+ public static class ExtractorImpl implements Extractor<String, byte[]> {
+ private int recordsEmitted = 0;
+ private final long startTime;
+ private final long endTime;
+ private final int computeTimeNano;
+ private final int sleepTimeMicro;
+ private final int numRecords;
+ private final int memAllocBytes;
+ private final Random random;
+
+ public ExtractorImpl(WorkUnitState state) {
+ this.random = new Random();
+ this.startTime = System.currentTimeMillis();
+
+ int runDuration = state.getPropAsInt(RUN_DURATION_KEY, DEFAULT_RUN_DURATION);
+
+ // set the end time based on the configured duration
+ if (runDuration > 0) {
+ this.endTime = this.startTime + runDuration * 1000;
+ } else {
+ this.endTime = INVALID_TIME;
+ }
+
+ this.computeTimeNano = state.getPropAsInt(COMPUTE_TIME_MICRO_KEY, DEFAULT_COMPUTE_TIME_MICRO) * 1000;
+ this.sleepTimeMicro = state.getPropAsInt(SLEEP_TIME_MICRO_KEY, DEFAULT_SLEEP_TIME);
+ // num records only takes effect if the duration is not specified
+ this.numRecords = this.endTime == INVALID_TIME ? state.getPropAsInt(NUM_RECORDS_KEY, DEFAULT_NUM_RECORDS) : 0;
+ this.memAllocBytes = state.getPropAsInt(MEM_ALLOC_BYTES_KEY, DEFAULT_MEM_ALLOC_BYTES);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to do
+ }
+
+ @Override
+ public String getSchema() throws IOException {
+ return "string";
+ }
+
+ /**
+ * Read a record with configurable idle and compute time.
+ **/
+ @Override
+ public byte[] readRecord(byte[] reuse) throws DataRecordException, IOException {
+
+ // If an end time is configured then it is used as the stopping point otherwise the record count limit is used
+ if ((this.endTime != INVALID_TIME && System.currentTimeMillis() > this.endTime) ||
+ (this.numRecords > 0 && this.recordsEmitted >= this.numRecords)) {
+ return null;
+ }
+
+ // spend time computing
+ if (this.computeTimeNano > 0) {
+ final long startComputeNanoTime = System.nanoTime();
+ final byte[] bytes = new byte[100];
+
+ while (System.nanoTime() - startComputeNanoTime < this.computeTimeNano) {
+ random.nextBytes(bytes);
+ }
+ }
+
+ // sleep
+ if (this.sleepTimeMicro > 0) {
+ try {
+ TimeUnit.MICROSECONDS.sleep(this.sleepTimeMicro);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ this.recordsEmitted++;
+
+ return newMessage(this.memAllocBytes);
+ }
+
+ @Override public long getExpectedRecordCount() {
+ return this.numRecords;
+ }
+
+ @Override public long getHighWatermark() {
+ return 0;
+ }
+
+ /**
+ * Create a message of numBytes size.
+ * @param numBytes number of bytes to allocate for the message
+ */
+ private byte[] newMessage(int numBytes) {
+ byte[] stringBytes = String.valueOf(this.recordsEmitted).getBytes(Charsets.UTF_8);
+
+ return Arrays.copyOf(stringBytes, numBytes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55689d28/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
new file mode 100644
index 0000000..aad8a3a
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * Unit tests for {@link StressTestingSource}
+ */
+public class TestStressTestingSource {
+
+ @Test
+ public void testSourceExtractor() throws DataRecordException, IOException {
+ final int MEM_ALLOC_BYTES = 100;
+ final int NUM_WORK_UNITS = 10;
+ final int COMPUTE_TIME_MICRO = 10;
+ final int NUM_RECORDS = 10000;
+
+
+ SourceState state = new SourceState();
+ state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+ state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+ state.setProp(StressTestingSource.COMPUTE_TIME_MICRO_KEY, COMPUTE_TIME_MICRO);
+ state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+
+ StressTestingSource source = new StressTestingSource();
+
+ List<WorkUnit> wus = source.getWorkunits(state);
+ Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+ for (int i = 0; i < wus.size(); ++i) {
+ WorkUnit wu = wus.get(i);
+ WorkUnitState wuState = new WorkUnitState(wu, state);
+ Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+ Assert.assertEquals(extractor.getExpectedRecordCount(), NUM_RECORDS);
+ Assert.assertEquals(extractor.readRecord(null).length, 100);
+ }
+ }
+
+ @Test
+ public void testComputeTime() throws DataRecordException, IOException {
+ final int MEM_ALLOC_BYTES = 100;
+ final int NUM_WORK_UNITS = 1;
+ final int COMPUTE_TIME_MICRO = 10000;
+ final int NUM_RECORDS = 500;
+
+ SourceState state = new SourceState();
+ state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+ state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+ state.setProp(StressTestingSource.COMPUTE_TIME_MICRO_KEY, COMPUTE_TIME_MICRO);
+ state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+
+ StressTestingSource source = new StressTestingSource();
+
+ List<WorkUnit> wus = source.getWorkunits(state);
+ Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+ WorkUnit wu = wus.get(0);
+ WorkUnitState wuState = new WorkUnitState(wu, state);
+ Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+ byte[] record;
+ long startTimeNano = System.nanoTime();
+ while ((record = extractor.readRecord(null)) != null) {
+ Assert.assertEquals(record.length, 100);
+ }
+ long endTimeNano = System.nanoTime();
+
+ long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
+ // check that there is less than 2 second difference between expected and actual time spent
+ Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * NUM_RECORDS)) < (2000000));
+ }
+
+ @Test
+ public void testSleepTime() throws DataRecordException, IOException {
+ final int MEM_ALLOC_BYTES = 100;
+ final int NUM_WORK_UNITS = 1;
+ final int SLEEP_TIME_MICRO = 10000;
+ final int NUM_RECORDS = 500;
+
+ SourceState state = new SourceState();
+ state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+ state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+ state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+ state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+
+ StressTestingSource source = new StressTestingSource();
+
+ List<WorkUnit> wus = source.getWorkunits(state);
+ Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+ WorkUnit wu = wus.get(0);
+ WorkUnitState wuState = new WorkUnitState(wu, state);
+ Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+ byte[] record;
+ long startTimeNano = System.nanoTime();
+ while ((record = extractor.readRecord(null)) != null) {
+ Assert.assertEquals(record.length, 100);
+ }
+ long endTimeNano = System.nanoTime();
+
+ long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
+ // check that there is less than 2 second difference between expected and actual time spent
+ Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * NUM_RECORDS)) < (2000000));
+ }
+
+ @Test
+ public void testRunDuration() throws DataRecordException, IOException {
+ final int MEM_ALLOC_BYTES = 100;
+ final int NUM_WORK_UNITS = 1;
+ final int SLEEP_TIME_MICRO = 1000;
+ final int NUM_RECORDS = 30; // this config is ignored since the duration is set
+ final int RUN_DURATION_SECS = 5;
+
+ SourceState state = new SourceState();
+ state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+ state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+ state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+ state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+ state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS);
+
+ StressTestingSource source = new StressTestingSource();
+
+ List<WorkUnit> wus = source.getWorkunits(state);
+ Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+ WorkUnit wu = wus.get(0);
+ WorkUnitState wuState = new WorkUnitState(wu, state);
+ Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+ byte[] record;
+ long startTimeNano = System.nanoTime();
+ while ((record = extractor.readRecord(null)) != null) {
+ Assert.assertEquals(record.length, 100);
+ }
+ long endTimeNano = System.nanoTime();
+
+ long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
+ // check that there is less than 1 second difference between expected and actual time spent
+ Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000));
+ }
+}