You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:45 UTC
[37/55] [abbrv] beam git commit: Fix query10 log messages
Fix query10 log messages
issue #5 and issue #51
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee500b28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee500b28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee500b28
Branch: refs/heads/master
Commit: ee500b28086f1261101395dc0b7b23f197ba19d9
Parents: 3d5c3d0
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 30 18:00:00 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/pom.xml | 6 +++
.../integration/nexmark/queries/Query10.java | 39 ++++++++------------
2 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ee500b28/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 86b88bd..664a410 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -255,6 +255,12 @@
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/ee500b28/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index c868666..378d01e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.integration.nexmark.queries;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
@@ -33,9 +30,9 @@ import org.apache.beam.integration.nexmark.model.Done;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
@@ -57,7 +54,6 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Query "10", 'Log to sharded files' (Not in original suite.)
*
@@ -132,12 +128,9 @@ public class Query10 extends NexmarkQuery {
*/
private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
throws IOException {
- //TODO Decide what to do about this one
-// WritableByteChannel channel =
-// GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
-// checkState(channel instanceof GoogleCloudStorageWriteChannel);
-// ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
-// return channel;
+ //TODO
+ // Fix after PR: right now this is a specific Google added use case
+ // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
}
@@ -192,7 +185,7 @@ public class Query10 extends NexmarkQuery {
public void processElement(ProcessContext c) {
if (c.element().hasAnnotation("LATE")) {
lateCounter.inc();
- LOG.error("Observed late: %s", c.element());
+ LOG.info("Observed late: %s", c.element());
} else {
onTimeCounter.inc();
}
@@ -240,11 +233,11 @@ public class Query10 extends NexmarkQuery {
}
}
String shard = c.element().getKey();
- LOG.error(
+ LOG.info(String.format(
"%s with timestamp %s has %d actually late and %d on-time "
+ "elements in pane %s for window %s",
shard, c.timestamp(), numLate, numOnTime, c.pane(),
- window.maxTimestamp());
+ window.maxTimestamp()));
if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
if (numLate == 0) {
LOG.error(
@@ -283,11 +276,11 @@ public class Query10 extends NexmarkQuery {
String shard = c.element().getKey();
GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
OutputFile outputFile = outputFileFor(window, shard, c.pane());
- LOG.error(
+ LOG.info(String.format(
"Writing %s with record timestamp %s, window timestamp %s, pane %s",
- shard, c.timestamp(), window.maxTimestamp(), c.pane());
+ shard, c.timestamp(), window.maxTimestamp(), c.pane()));
if (outputFile.filename != null) {
- LOG.error("Beginning write to '%s'", outputFile.filename);
+ LOG.info("Beginning write to '%s'", outputFile.filename);
int n = 0;
try (OutputStream output =
Channels.newOutputStream(openWritableGcsFile(options, outputFile
@@ -296,12 +289,12 @@ public class Query10 extends NexmarkQuery {
Event.CODER.encode(event, output, Coder.Context.OUTER);
writtenRecordsCounter.inc();
if (++n % 10000 == 0) {
- LOG.error("So far written %d records to '%s'", n,
+ LOG.info("So far written %d records to '%s'", n,
outputFile.filename);
}
}
}
- LOG.error("Written all %d records to '%s'", n, outputFile.filename);
+ LOG.info("Written all %d records to '%s'", n, outputFile.filename);
}
savedFileCounter.inc();
c.output(KV.<Void, OutputFile>of(null, outputFile));
@@ -341,23 +334,23 @@ public class Query10 extends NexmarkQuery {
LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
} else {
GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- LOG.error(
+ LOG.info(
"Index with record timestamp %s, window timestamp %s, pane %s",
c.timestamp(), window.maxTimestamp(), c.pane());
@Nullable String filename = indexPathFor(window);
if (filename != null) {
- LOG.error("Beginning write to '%s'", filename);
+ LOG.info("Beginning write to '%s'", filename);
int n = 0;
try (OutputStream output =
Channels.newOutputStream(
openWritableGcsFile(options, filename))) {
for (OutputFile outputFile : c.element().getValue()) {
- output.write(outputFile.toString().getBytes());
+ output.write(outputFile.toString().getBytes("UTF-8"));
n++;
}
}
- LOG.error("Written all %d lines to '%s'", n, filename);
+ LOG.info("Written all %d lines to '%s'", n, filename);
}
c.output(
new Done("written for timestamp " + window.maxTimestamp()));