You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/04 06:30:00 UTC

[jira] [Work logged] (BEAM-4048) Refactor COMBINE mode for reading and writing from/to Pub/Sub and Kafka in Nexmark

     [ https://issues.apache.org/jira/browse/BEAM-4048?focusedWorklogId=98132&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98132 ]

ASF GitHub Bot logged work on BEAM-4048:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/May/18 06:29
            Start Date: 04/May/18 06:29
    Worklog Time Spent: 10m 
      Work Description: rangadi commented on a change in pull request #5249: [BEAM-4048] Refactor COMBINE mode for reading/writing from/to Pub/Sub and Kafka in Nexmark
URL: https://github.com/apache/beam/pull/5249#discussion_r185996349
 
 

 ##########
 File path: sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 ##########
 @@ -760,66 +763,69 @@ private String logsDir(long now) {
   }
 
   static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY =
-          new DoFn<Event, byte[]>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              try {
-                byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
-                c.output(encodedEvent);
-              } catch (CoderException e1) {
-                LOG.error("Error while sending Event {} to Kafka: serialization error",
-                        c.element().toString());
-              }
-            }
-          };
+      new DoFn<Event, byte[]>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          try {
+            byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
+            c.output(encodedEvent);
+          } catch (CoderException e1) {
+            LOG.error(
+                "Error while sending Event {} to Kafka: serialization error",
+                c.element().toString());
+          }
+        }
+      };
 
   /**
    * Send {@code events} to Kafka.
    */
   private void sinkEventsToKafka(PCollection<Event> events) {
-    PCollection<byte[]> eventToBytes =
-        events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
-    eventToBytes.apply(KafkaIO.<Void, byte[]>write()
-                    .withBootstrapServers(options.getBootstrapServers())
-                    .withTopic(options.getKafkaSinkTopic())
-                    .withValueSerializer(ByteArraySerializer.class)
-                    .values());
+    NexmarkUtils.console("Writing events to Kafka Topic %s", options.getKafkaEventsTopic());
+    checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers");
 
+    PCollection<byte[]> eventToBytes = events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
+    eventToBytes.apply(
+        KafkaIO.<Void, byte[]>write()
+            .withBootstrapServers(options.getBootstrapServers())
+            .withTopic(options.getKafkaEventsTopic())
+            .withValueSerializer(ByteArraySerializer.class)
+            .values());
   }
 
-
   static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
-          new DoFn<KV<Long, byte[]>, Event>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              byte[] encodedEvent = c.element().getValue();
-              try {
-                Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
-                c.output(event);
-              } catch (CoderException e) {
-                LOG.error("Error while decoding Event from Kafka message: serialization error");
-              }
-            }
-          };
+      new DoFn<KV<Long, byte[]>, Event>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          byte[] encodedEvent = c.element().getValue();
+          try {
+            Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
+            c.output(event);
+          } catch (CoderException e) {
+            LOG.error("Error while decoding Event from Kafka message: serialization error");
+          }
+        }
+      };
 
   /**
    * Return source of events from Kafka.
    */
-  private PCollection<Event> sourceEventsFromKafka(Pipeline p) {
-    NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic());
-
-    checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
-        "Missing --bootstrapServers");
+  private PCollection<Event> sourceEventsFromKafka(Pipeline p, final Instant now) {
+    NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaEventsTopic());
+    checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers");
 
-    KafkaIO.Read<Long, byte[]> read = KafkaIO.<Long, byte[]>read()
+    KafkaIO.Read<Long, byte[]> read =
+        KafkaIO.<Long, byte[]>read()
             .withBootstrapServers(options.getBootstrapServers())
-            .withTopic(options.getKafkaSourceTopic())
+            .withTopic(options.getKafkaEventsTopic())
             .withKeyDeserializer(LongDeserializer.class)
-            .withValueDeserializer(ByteArrayDeserializer.class);
+            .withValueDeserializer(ByteArrayDeserializer.class)
+            .withStartReadTime(now)
+            .withMaxNumRecords(
+                options.getNumEvents() != null ? options.getNumEvents() : Long.MAX_VALUE);
 
-    return p
-      .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
-      .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
+    return p.apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
+            .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
 
 Review comment:
   I think earlier indentation is more consistent with rest of Beam ('.' with 4 spaces).
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 98132)
    Time Spent: 50m  (was: 40m)

> Refactor COMBINE mode for reading and writing from/to Pub/Sub and Kafka in Nexmark
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-4048
>                 URL: https://issues.apache.org/jira/browse/BEAM-4048
>             Project: Beam
>          Issue Type: Improvement
>          Components: examples-nexmark
>            Reporter: Etienne Chauchot
>            Assignee: Alexey Romanenko
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)