You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:45 UTC

[37/55] [abbrv] beam git commit: Fix query10 log messages

Fix query10 log messages

issue #5 and issue #51


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee500b28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee500b28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee500b28

Branch: refs/heads/master
Commit: ee500b28086f1261101395dc0b7b23f197ba19d9
Parents: 3d5c3d0
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 30 18:00:00 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                |  6 +++
 .../integration/nexmark/queries/Query10.java    | 39 ++++++++------------
 2 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee500b28/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 86b88bd..664a410 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -255,6 +255,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/ee500b28/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index c868666..378d01e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.integration.nexmark.queries;
 
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -33,9 +30,9 @@ import org.apache.beam.integration.nexmark.model.Done;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -57,7 +54,6 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Query "10", 'Log to sharded files' (Not in original suite.)
  *
@@ -132,12 +128,9 @@ public class Query10 extends NexmarkQuery {
    */
   private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
       throws IOException {
-    //TODO Decide what to do about this one
-//    WritableByteChannel channel =
-//            GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
-//    checkState(channel instanceof GoogleCloudStorageWriteChannel);
-//    ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
-//    return channel;
+    //TODO
+    // Fix after PR: right now this is a specific Google added use case
+    // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
     throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
   }
 
@@ -192,7 +185,7 @@ public class Query10 extends NexmarkQuery {
           public void processElement(ProcessContext c) {
             if (c.element().hasAnnotation("LATE")) {
               lateCounter.inc();
-              LOG.error("Observed late: %s", c.element());
+              LOG.info("Observed late: %s", c.element());
             } else {
               onTimeCounter.inc();
             }
@@ -240,11 +233,11 @@ public class Query10 extends NexmarkQuery {
               }
             }
             String shard = c.element().getKey();
-            LOG.error(
+            LOG.info(String.format(
                 "%s with timestamp %s has %d actually late and %d on-time "
                     + "elements in pane %s for window %s",
                 shard, c.timestamp(), numLate, numOnTime, c.pane(),
-                window.maxTimestamp());
+                window.maxTimestamp()));
             if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
               if (numLate == 0) {
                 LOG.error(
@@ -283,11 +276,11 @@ public class Query10 extends NexmarkQuery {
               String shard = c.element().getKey();
               GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
               OutputFile outputFile = outputFileFor(window, shard, c.pane());
-              LOG.error(
+              LOG.info(String.format(
                   "Writing %s with record timestamp %s, window timestamp %s, pane %s",
-                  shard, c.timestamp(), window.maxTimestamp(), c.pane());
+                  shard, c.timestamp(), window.maxTimestamp(), c.pane()));
               if (outputFile.filename != null) {
-                LOG.error("Beginning write to '%s'", outputFile.filename);
+                LOG.info("Beginning write to '%s'", outputFile.filename);
                 int n = 0;
                 try (OutputStream output =
                          Channels.newOutputStream(openWritableGcsFile(options, outputFile
@@ -296,12 +289,12 @@ public class Query10 extends NexmarkQuery {
                     Event.CODER.encode(event, output, Coder.Context.OUTER);
                     writtenRecordsCounter.inc();
                     if (++n % 10000 == 0) {
-                      LOG.error("So far written %d records to '%s'", n,
+                      LOG.info("So far written %d records to '%s'", n,
                           outputFile.filename);
                     }
                   }
                 }
-                LOG.error("Written all %d records to '%s'", n, outputFile.filename);
+                LOG.info("Written all %d records to '%s'", n, outputFile.filename);
               }
               savedFileCounter.inc();
               c.output(KV.<Void, OutputFile>of(null, outputFile));
@@ -341,23 +334,23 @@ public class Query10 extends NexmarkQuery {
               LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
             } else {
               GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
-              LOG.error(
+              LOG.info(
                   "Index with record timestamp %s, window timestamp %s, pane %s",
                   c.timestamp(), window.maxTimestamp(), c.pane());
 
               @Nullable String filename = indexPathFor(window);
               if (filename != null) {
-                LOG.error("Beginning write to '%s'", filename);
+                LOG.info("Beginning write to '%s'", filename);
                 int n = 0;
                 try (OutputStream output =
                          Channels.newOutputStream(
                              openWritableGcsFile(options, filename))) {
                   for (OutputFile outputFile : c.element().getValue()) {
-                    output.write(outputFile.toString().getBytes());
+                    output.write(outputFile.toString().getBytes("UTF-8"));
                     n++;
                   }
                 }
-                LOG.error("Written all %d lines to '%s'", n, filename);
+                LOG.info("Written all %d lines to '%s'", n, filename);
               }
               c.output(
                   new Done("written for timestamp " + window.maxTimestamp()));