You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/05/03 22:48:26 UTC

[1/2] incubator-beam git commit: [BEAM-255] Write: add limited logging

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3bcc5058a -> ec0e9fd7b


[BEAM-255] Write: add limited logging

This will help, for all sinks, users and developers gain insight into where time
is spent. (Enabling DEBUG level will provide more insight.)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad406696
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad406696
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad406696

Branch: refs/heads/master
Commit: ad406696793132b82450cad9fabf7fcea379158d
Parents: 3bcc505
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 3 13:07:04 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 13:42:44 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/Write.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad406696/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index b6743fa..9cb026a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -36,6 +36,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.UUID;
 
 /**
@@ -52,6 +55,8 @@ import java.util.UUID;
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class Write {
+  private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
   /**
    * Creates a Write transform that writes to the given Sink.
    */
@@ -144,7 +149,9 @@ public class Write {
             @Override
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
+              LOG.info("Initializing write operation {}", writeOperation);
               writeOperation.initialize(c.getPipelineOptions());
+              LOG.debug("Done initializing write operation {}", writeOperation);
               // The WriteOperation is also the output of this ParDo, so it can have mutable
               // state.
               c.output(writeOperation);
@@ -172,8 +179,10 @@ public class Write {
               // Lazily initialize the Writer
               if (writer == null) {
                 WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+                LOG.info("Opening writer for write operation {}", writeOperation);
                 writer = writeOperation.createWriter(c.getPipelineOptions());
                 writer.open(UUID.randomUUID().toString());
+                LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
               }
               try {
                 writer.write(c.element());
@@ -211,9 +220,12 @@ public class Write {
           .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
             @Override
             public void processElement(ProcessContext c) throws Exception {
-              Iterable<WriteT> results = c.sideInput(resultsView);
               WriteOperation<T, WriteT> writeOperation = c.element();
+              LOG.info("Finalizing write operation {}", writeOperation);
+              Iterable<WriteT> results = c.sideInput(resultsView);
+              LOG.debug("Side input initialized to finalize write operation {}", writeOperation);
               writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
             }
           }).withSideInputs(resultsView));
       return PDone.in(input.getPipeline());


[2/2] incubator-beam git commit: [BEAM-255] This closes #279

Posted by lc...@apache.org.
[BEAM-255] This closes #279


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec0e9fd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec0e9fd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec0e9fd7

Branch: refs/heads/master
Commit: ec0e9fd7b3ddf05ce7c0ee104c33406de7b3d091
Parents: 3bcc505 ad40669
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 3 13:43:12 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 3 13:43:12 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/Write.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------