You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:10:02 UTC

[54/55] [abbrv] beam git commit: Improve NexmarkUtils: improve diskBusy() and remove unneeded randomization code

Improve NexmarkUtils: improve diskBusy() and remove unneeded randomization code

- Use state API in NexmarkUtils.diskBusy()
- Remove commented code for direct runner randomization disabling: direct runner no more allows disabling randomization and queries and UT pass


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c116709
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c116709
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c116709

Branch: refs/heads/master
Commit: 6c116709fff06f7faa491a090f441f618931d256
Parents: ee500b2
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 30 18:00:00 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:29 2017 +0200

----------------------------------------------------------------------
 .../beam/integration/nexmark/NexmarkUtils.java  | 87 ++++++++++++--------
 .../nexmark/queries/NexmarkQuery.java           |  2 +-
 2 files changed, 52 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6c116709/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 7707429..7926690 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -18,14 +18,12 @@
 package org.apache.beam.integration.nexmark;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
-import java.util.List;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.AuctionCount;
@@ -66,6 +64,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -289,8 +288,8 @@ public class NexmarkUtils {
   private static final boolean LOG_ERROR = true;
 
   /**
-   * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results in real-time with:
-   * tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
+   * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results
+   * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
    */
   private static final boolean LOG_TO_CONSOLE = false;
 
@@ -340,14 +339,6 @@ public class NexmarkUtils {
    * Setup pipeline with codes and some other options.
    */
   public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
-    //TODO Ismael check
-//    PipelineRunner<?> runner = p.getRunner();
-//    if (runner instanceof DirectRunner) {
-//      // Disable randomization of output since we want to check batch and streaming match the
-//      // model both locally and on the cloud.
-//      ((DirectRunner) runner).withUnorderednessTesting(false);
-//    }
-
     CoderRegistry registry = p.getCoderRegistry();
     switch (coderStrategy) {
       case HAND:
@@ -565,35 +556,59 @@ public class NexmarkUtils {
                 });
   }
 
-  private static final StateSpec<ValueState<byte[]>> DUMMY_TAG =
-          StateSpecs.value(ByteArrayCoder.of());
   private static final int MAX_BUFFER_SIZE = 1 << 24;
 
+  private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{
+
+    private long bytes;
+
+    private DiskBusyTransform(long bytes) {
+      this.bytes = bytes;
+    }
+
+    @Override public PCollection<T> expand(PCollection<T> input) {
+      // Add dummy key to be able to use State API
+      PCollection<KV<Integer, T>> kvCollection = input.apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() {
+
+        @ProcessElement public void processElement(ProcessContext context) {
+          context.output(KV.of(0, context.element()));
+        }
+      }));
+      // Apply actual transform that generates disk IO using state API
+      PCollection<T> output = kvCollection.apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() {
+
+        private static final String DISK_BUSY = "diskBusy";
+
+        @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs
+            .value(ByteArrayCoder.of());
+
+        @ProcessElement public void processElement(ProcessContext c,
+            @StateId(DISK_BUSY) ValueState<byte[]> state) {
+          long remain = bytes;
+          long now = System.currentTimeMillis();
+          while (remain > 0) {
+            long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
+            remain -= thisBytes;
+            byte[] arr = new byte[(int) thisBytes];
+            for (int i = 0; i < thisBytes; i++) {
+              arr[i] = (byte) now;
+            }
+            state.write(arr);
+            now = System.currentTimeMillis();
+          }
+          c.output(c.element().getValue());
+        }
+      }));
+      return output;
+    }
+  }
+
+
   /**
    * Return a transform to write given number of bytes to durable store on every record.
    */
-  public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) {
-    return ParDo.of(new DoFn<T, T>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    long remain = bytes;
-//                    long now = System.currentTimeMillis();
-                    while (remain > 0) {
-                      //TODO Ismael google on state
-                      long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
-                      remain -= thisBytes;
-//                      byte[] arr = new byte[(int) thisBytes];
-//                      for (int i = 0; i < thisBytes; i++) {
-//                        arr[i] = (byte) now;
-//                      }
-//                      ValueState<byte[]> state = c.windowingInternals().stateInternals().state(
-//                          StateNamespaces.global(), DUMMY_TAG);
-//                      state.write(arr);
-//                      now = System.currentTimeMillis();
-                    }
-                    c.output(c.element());
-                  }
-                });
+  public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) {
+    return new DiskBusyTransform<>(bytes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/6c116709/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
index 09415c0..8b74282 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
@@ -253,7 +253,7 @@ public abstract class NexmarkQuery
     if (configuration.diskBusyBytes > 0) {
       // Slow down by forcing bytes to durable store.
       events = events.apply(name + ".DiskBusy",
-              NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
+              NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes));
     }
 
     // Run the query.