You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:10:02 UTC
[54/55] [abbrv] beam git commit: Improve NexmarkUtils: improve
diskBusy() and remove unneeded randomization code
Improve NexmarkUtils: improve diskBusy() and remove unneeded randomization code
- Use state API in NexmarkUtils.diskBusy()
- Remove commented code for direct runner randomization disabling: direct runner no more allows disabling randomization and queries and UT pass
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c116709
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c116709
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c116709
Branch: refs/heads/master
Commit: 6c116709fff06f7faa491a090f441f618931d256
Parents: ee500b2
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 30 18:00:00 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:29 2017 +0200
----------------------------------------------------------------------
.../beam/integration/nexmark/NexmarkUtils.java | 87 ++++++++++++--------
.../nexmark/queries/NexmarkQuery.java | 2 +-
2 files changed, 52 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6c116709/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 7707429..7926690 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -18,14 +18,12 @@
package org.apache.beam.integration.nexmark;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
-import java.util.List;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.AuctionCount;
@@ -66,6 +64,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -289,8 +288,8 @@ public class NexmarkUtils {
private static final boolean LOG_ERROR = true;
/**
- * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results in real-time with:
- * tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
+ * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results
+ * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
*/
private static final boolean LOG_TO_CONSOLE = false;
@@ -340,14 +339,6 @@ public class NexmarkUtils {
* Setup pipeline with codes and some other options.
*/
public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
- //TODO Ismael check
-// PipelineRunner<?> runner = p.getRunner();
-// if (runner instanceof DirectRunner) {
-// // Disable randomization of output since we want to check batch and streaming match the
-// // model both locally and on the cloud.
-// ((DirectRunner) runner).withUnorderednessTesting(false);
-// }
-
CoderRegistry registry = p.getCoderRegistry();
switch (coderStrategy) {
case HAND:
@@ -565,35 +556,59 @@ public class NexmarkUtils {
});
}
- private static final StateSpec<ValueState<byte[]>> DUMMY_TAG =
- StateSpecs.value(ByteArrayCoder.of());
private static final int MAX_BUFFER_SIZE = 1 << 24;
+ private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{
+
+ private long bytes;
+
+ private DiskBusyTransform(long bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override public PCollection<T> expand(PCollection<T> input) {
+ // Add dummy key to be able to use State API
+ PCollection<KV<Integer, T>> kvCollection = input.apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() {
+
+ @ProcessElement public void processElement(ProcessContext context) {
+ context.output(KV.of(0, context.element()));
+ }
+ }));
+ // Apply actual transform that generates disk IO using state API
+ PCollection<T> output = kvCollection.apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() {
+
+ private static final String DISK_BUSY = "diskBusy";
+
+ @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs
+ .value(ByteArrayCoder.of());
+
+ @ProcessElement public void processElement(ProcessContext c,
+ @StateId(DISK_BUSY) ValueState<byte[]> state) {
+ long remain = bytes;
+ long now = System.currentTimeMillis();
+ while (remain > 0) {
+ long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
+ remain -= thisBytes;
+ byte[] arr = new byte[(int) thisBytes];
+ for (int i = 0; i < thisBytes; i++) {
+ arr[i] = (byte) now;
+ }
+ state.write(arr);
+ now = System.currentTimeMillis();
+ }
+ c.output(c.element().getValue());
+ }
+ }));
+ return output;
+ }
+ }
+
+
/**
* Return a transform to write given number of bytes to durable store on every record.
*/
- public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) {
- return ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- long remain = bytes;
-// long now = System.currentTimeMillis();
- while (remain > 0) {
- //TODO Ismael google on state
- long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
- remain -= thisBytes;
-// byte[] arr = new byte[(int) thisBytes];
-// for (int i = 0; i < thisBytes; i++) {
-// arr[i] = (byte) now;
-// }
-// ValueState<byte[]> state = c.windowingInternals().stateInternals().state(
-// StateNamespaces.global(), DUMMY_TAG);
-// state.write(arr);
-// now = System.currentTimeMillis();
- }
- c.output(c.element());
- }
- });
+ public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) {
+ return new DiskBusyTransform<>(bytes);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/6c116709/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
index 09415c0..8b74282 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
@@ -253,7 +253,7 @@ public abstract class NexmarkQuery
if (configuration.diskBusyBytes > 0) {
// Slow down by forcing bytes to durable store.
events = events.apply(name + ".DiskBusy",
- NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
+ NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes));
}
// Run the query.