You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/22 02:34:56 UTC
[2/5] incubator-beam git commit: Kafka: various fixes
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
deleted file mode 100644
index b4bdc83..0000000
--- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Tests of {@link KafkaSource}.
- */
-@RunWith(JUnit4.class)
-public class KafkaIOTest {
- /*
- * The tests below borrow code and structure from CountingSourceTest. In addition verifies
- * the reader interleaves the records from multiple partitions.
- *
- * Other tests to consider :
- * - test KafkaRecordCoder
- */
-
- // Update mock consumer with records distributed among the given topics, each with given number
- // of partitions. Records are assigned in round-robin order among the partitions.
- private static MockConsumer<byte[], byte[]> mkMockConsumer(
- List<String> topics, int partitionsPerTopic, int numElements) {
-
- final List<TopicPartition> partitions = new ArrayList<>();
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
- Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
-
- for (String topic : topics) {
- List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
- for (int i = 0; i < partitionsPerTopic; i++) {
- partitions.add(new TopicPartition(topic, i));
- partIds.add(new PartitionInfo(topic, i, null, null, null));
- }
- partitionMap.put(topic, partIds);
- }
-
- int numPartitions = partitions.size();
- long[] offsets = new long[numPartitions];
-
- for (int i = 0; i < numElements; i++) {
- int pIdx = i % numPartitions;
- TopicPartition tp = partitions.get(pIdx);
-
- if (!records.containsKey(tp)) {
- records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
- }
- records.get(tp).add(
- // Note: this interface has changed in 0.10. may get fixed before the release.
- new ConsumerRecord<byte[], byte[]>(
- tp.topic(),
- tp.partition(),
- offsets[pIdx]++,
- null, // key
- ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
- }
-
- MockConsumer<byte[], byte[]> consumer =
- new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
- // override assign() to add records that belong to the assigned partitions.
- public void assign(List<TopicPartition> assigned) {
- super.assign(assigned);
- for (TopicPartition tp : assigned) {
- for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
- addRecord(r);
- }
- updateBeginningOffsets(ImmutableMap.of(tp, 0L));
- updateEndOffsets(ImmutableMap.of(tp, (long)records.get(tp).size()));
- seek(tp, 0);
- }
- }
- };
-
- for (String topic : topics) {
- consumer.updatePartitions(topic, partitionMap.get(topic));
- }
-
- return consumer;
- }
-
- private static class ConsumerFactoryFn
- implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
- private final List<String> topics;
- private final int partitionsPerTopic;
- private final int numElements;
-
- public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
- this.topics = topics;
- this.partitionsPerTopic = partitionsPerTopic;
- this.numElements = numElements;
- }
-
- public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
- return mkMockConsumer(topics, partitionsPerTopic, numElements);
- }
- }
-
- /**
- * Creates a consumer with two topics, with 5 partitions each.
- * numElements are (round-robin) assigned all the 10 partitions.
- */
- private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
- int numElements,
- @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
-
- List<String> topics = ImmutableList.of("topic_a", "topic_b");
-
- KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
- .withBootstrapServers("none")
- .withTopics(topics)
- .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
- .withValueCoder(BigEndianLongCoder.of())
- .withMaxNumRecords(numElements);
-
- if (timestampFn != null) {
- return reader.withTimestampFn(timestampFn);
- } else {
- return reader;
- }
- }
-
- private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
- private final int num;
-
- public AssertMultipleOf(int num) {
- this.num = num;
- }
-
- @Override
- public Void apply(Iterable<Long> values) {
- for (Long v : values) {
- assertEquals(0, v % num);
- }
- return null;
- }
- }
-
- public static void addCountingAsserts(PCollection<Long> input, long numElements) {
- // Count == numElements
- DataflowAssert
- .thatSingleton(input.apply("Count", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Unique count == numElements
- DataflowAssert
- .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
- .apply("UniqueCount", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Min == 0
- DataflowAssert
- .thatSingleton(input.apply("Min", Min.<Long>globally()))
- .isEqualTo(0L);
- // Max == numElements-1
- DataflowAssert
- .thatSingleton(input.apply("Max", Max.<Long>globally()))
- .isEqualTo(numElements - 1);
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testUnboundedSource() {
- Pipeline p = TestPipeline.create();
- int numElements = 1000;
-
- PCollection<Long> input = p
- .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
- .withoutMetadata())
- .apply(Values.<Long>create());
-
- addCountingAsserts(input, numElements);
- p.run();
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testUnboundedSourceWithExplicitPartitions() {
- Pipeline p = TestPipeline.create();
- int numElements = 1000;
-
- List<String> topics = ImmutableList.of("test");
-
- KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
- .withBootstrapServers("none")
- .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
- .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
- .withValueCoder(BigEndianLongCoder.of())
- .withMaxNumRecords(numElements / 10);
-
- PCollection<Long> input = p
- .apply(reader.withoutMetadata())
- .apply(Values.<Long>create());
-
- // assert that every element is a multiple of 5.
- DataflowAssert
- .that(input)
- .satisfies(new AssertMultipleOf(5));
-
- DataflowAssert
- .thatSingleton(input.apply(Count.<Long>globally()))
- .isEqualTo(numElements / 10L);
-
- p.run();
- }
-
- private static class ElementValueDiff extends DoFn<Long, Long> {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element() - c.timestamp().getMillis());
- }
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testUnboundedSourceTimestamps() {
- Pipeline p = TestPipeline.create();
- int numElements = 1000;
-
- PCollection<Long> input = p
- .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
- .apply(Values.<Long>create());
-
- addCountingAsserts(input, numElements);
-
- PCollection<Long> diffs = input
- .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
- .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
- // This assert also confirms that diffs only has one unique value.
- DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
-
- p.run();
- }
-
- private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
- @Override
- public void processElement(ProcessContext ctx) throws Exception {
- ctx.output(ctx.element().getKV());
- }
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testUnboundedSourceSplits() throws Exception {
- Pipeline p = TestPipeline.create();
- int numElements = 1000;
- int numSplits = 10;
-
- UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
- mkKafkaReadTransform(numElements, null).makeSource();
- List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
- initial.generateInitialSplits(numSplits, p.getOptions());
- assertEquals("Expected exact splitting", numSplits, splits.size());
-
- long elementsPerSplit = numElements / numSplits;
- assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
- PCollectionList<Long> pcollections = PCollectionList.empty(p);
- for (int i = 0; i < splits.size(); ++i) {
- pcollections = pcollections.and(
- p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
- .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
- .apply("collection " + i, Values.<Long>create()));
- }
- PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
-
- addCountingAsserts(input, numElements);
- p.run();
- }
-
- /**
- * A timestamp function that uses the given value as the timestamp.
- */
- private static class ValueAsTimestampFn
- implements SerializableFunction<KV<byte[], Long>, Instant> {
- @Override
- public Instant apply(KV<byte[], Long> input) {
- return new Instant(input.getValue());
- }
- }
-
- @Test
- public void testUnboundedSourceCheckpointMark() throws Exception {
- int numElements = 85; // 85 to make sure some partitions have more records than other.
-
- // create a single split:
- UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
- mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
- .makeSource()
- .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
- .get(0);
-
- UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
- final int numToSkip = 3;
- // advance once:
- assertTrue(reader.start());
-
- // Advance the source numToSkip-1 elements and manually save state.
- for (long l = 0; l < numToSkip - 1; ++l) {
- assertTrue(reader.advance());
- }
-
- // Confirm that we get the expected element in sequence before checkpointing.
-
- assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
- assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
-
- // Checkpoint and restart, and confirm that the source continues correctly.
- KafkaCheckpointMark mark = CoderUtils.clone(
- source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
- reader = source.createReader(null, mark);
- assertTrue(reader.start());
-
- // Confirm that we get the next elements in sequence.
- // This also confirms that Reader interleaves records from each partitions by the reader.
- for (int i = numToSkip; i < numElements; i++) {
- assertEquals(i, (long) reader.getCurrent().getKV().getValue());
- assertEquals(i, reader.getCurrentTimestamp().getMillis());
- reader.advance();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
new file mode 100644
index 0000000..98a091d
--- /dev/null
+++ b/sdks/java/io/kafka/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>io-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>kafka</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Kafka</name>
+ <description>Library to read Kafka topics.</description>
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.12</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.6</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>../../checkstyle.xml</configLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>[0.9,)</version>
+ </dependency>
+
+ <!-- test dependencies-->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>${hamcrest.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
new file mode 100644
index 0000000..4b6b976
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id,
+ * and the latest offset consumed so far.
+ */
+@DefaultCoder(SerializableCoder.class)
+public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+
+ private final List<PartitionMark> partitions;
+
+ public KafkaCheckpointMark(List<PartitionMark> partitions) {
+ this.partitions = partitions;
+ }
+
+ public List<PartitionMark> getPartitions() {
+ return partitions;
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+ /* nothing to do */
+
+ // We might want to support committing offset in Kafka for better resume point when the job
+ // is restarted (checkpoint is not available for job restarts).
+ }
+
+ /**
+ * A tuple to hold topic, partition, and offset that comprise the checkpoint
+ * for a single partition.
+ */
+ public static class PartitionMark implements Serializable {
+ private final TopicPartition topicPartition;
+ private final long offset;
+
+ public PartitionMark(TopicPartition topicPartition, long offset) {
+ this.topicPartition = topicPartition;
+ this.offset = offset;
+ }
+
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
new file mode 100644
index 0000000..e605311
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * An unbounded source for <a href="http://kafka.apache.org/">Kafka</a> topics. Kafka version 0.9
+ * and above are supported.
+ *
+ * <h3>Reading from Kafka topics</h3>
+ *
+ * <p>KafkaIO source returns unbounded collection of Kafka records as
+ * {@code PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic
+ * metadata like topic-partition and offset, along with key and value associated with a Kafka
+ * record.
+ *
+ * <p>Although most applications consumer single topic, the source can be configured to consume
+ * multiple topics or even a specific set of {@link TopicPartition}s.
+ *
+ * <p> To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
+ * and one or more topics to consume. The following example illustrates various options for
+ * configuring the source :
+ *
+ * <pre>{@code
+ *
+ * pipeline
+ * .apply(KafkaIO.read()
+ * .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ * // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>
+ *
+ * // rest of the settings are optional :
+ *
+ * // set a Coder for Key and Value (note the change to return type)
+ * .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
+ * .withValueCoder(StringUtf8Coder.of()) // PCollection<KafkaRecord<Long, String>
+ *
+ * // you can further customize KafkaConsumer used to read the records by adding more
+ * // settings for ConsumerConfig. e.g :
+ * .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
+ *
+ * // custom function for calculating record timestamp (default is processing time)
+ * .withTimestampFn(new MyTypestampFunction())
+ *
+ * // custom function for watermark (default is record timestamp)
+ * .withWatermarkFn(new MyWatermarkFunction())
+ *
+ * // finally, if you don't need Kafka metadata, you can drop it
+ * .withoutMetadata() // PCollection<KV<Long, String>>
+ * )
+ * .apply(Values.<String>create()) // PCollection<String>
+ * ...
+ * }</pre>
+ *
+ * <h3>Partition Assignment and Checkpointing</h3>
+ * The Kafka partitions are evenly distributed among splits (workers).
+ * Dataflow checkpointing is fully supported and
+ * each split can resume from previous checkpoint. See
+ * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on
+ * splits and checkpoint support.
+ *
+ * <p>When the pipeline starts for the first time without any checkpoint, the source starts
+ * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
+ * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through
+ * {@link Read#updateConsumerProperties(Map)}.
+ *
+ * <h3>Advanced Kafka Configuration</h3>
+ * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like
+ * to enable offset <em>auto commit</em> (for external monitoring or other purposes), you can set
+ * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
+ */
+public class KafkaIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
+
+ private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> {
+ @Override
+ public Instant apply(T input) {
+ return Instant.now();
+ }
+ }
+
+
+ /**
+ * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
+ * configuration should set with {@link Read#withBootstrapServers(String)} and
+ * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
+ * custom timestamp and watermark functions.
+ */
+ public static Read<byte[], byte[]> read() {
+ return new Read<byte[], byte[]>(
+ new ArrayList<String>(),
+ new ArrayList<TopicPartition>(),
+ ByteArrayCoder.of(),
+ ByteArrayCoder.of(),
+ Read.KAFKA_9_CONSUMER_FACTORY_FN,
+ Read.DEFAULT_CONSUMER_PROPERTIES,
+ Long.MAX_VALUE,
+ null);
+ }
+
+ /**
+ * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+ * information on usage and configuration.
+ */
+ public static class Read<K, V> extends TypedRead<K, V> {
+
+ /**
+ * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
+ */
+ public Read<K, V> withBootstrapServers(String bootstrapServers) {
+ return updateConsumerProperties(
+ ImmutableMap.<String, Object>of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
+ }
+
+ /**
+ * Returns a new {@link Read} that reads from the topics. All the partitions are from each
+ * of the topics is read.
+ * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+ * of how the partitions are distributed among the splits.
+ */
+ public Read<K, V> withTopics(List<String> topics) {
+ checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+ return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset
+ * of partitions for one or more topics when (if ever) needed.
+ * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+ * of how the partitions are distributed among the splits.
+ */
+ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
+ checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+ return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Returns a new {@link Read} with {@link Coder} for key bytes.
+ */
+ public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
+ return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Returns a new {@link Read} with {@link Coder} for value bytes.
+ */
+ public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
+ return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * A factory to create Kafka {@link Consumer} from consumer configuration.
+ * This is useful for supporting another version of Kafka consumer.
+ * Default is {@link KafkaConsumer}.
+ */
+ public Read<K, V> withConsumerFactoryFn(
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
+ return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Update consumer configuration with new properties.
+ */
+ public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
+ for (String key : configUpdates.keySet()) {
+ checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key),
+ "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key));
+ }
+
+ Map<String, Object> config = new HashMap<>(consumerConfig);
+ config.putAll(configUpdates);
+
+ return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, config, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords(long)}.
+ * Mainly used for tests and demo applications.
+ */
+ public Read<K, V> withMaxNumRecords(long maxNumRecords) {
+ return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, maxNumRecords, null);
+ }
+
+ /**
+ * Similar to
+ * {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
+ * Mainly used for tests and demo
+ * applications.
+ */
+ public Read<K, V> withMaxReadTime(Duration maxReadTime) {
+ return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+ consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private Read(
+ List<String> topics,
+ List<TopicPartition> topicPartitions,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+ Map<String, Object> consumerConfig,
+ long maxNumRecords,
+ @Nullable Duration maxReadTime) {
+
+ super(topics, topicPartitions, keyCoder, valueCoder, null, null,
+ consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * A set of properties that are not required or don't make sense for our consumer.
+ */
+ private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead"
+ // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
+ // lets allow these, applications can have better resume point for restarts.
+ );
+
+ // set config defaults
+ private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+ ImmutableMap.<String, Object>of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+
+ // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
+ // with default value of of 32K, It takes multiple seconds between successful polls.
+ // All the consumer work is done inside poll(), with smaller send buffer size, it
+ // takes many polls before a 1MB chunk from the server is fully read. In my testing
+ // about half of the time select() inside kafka consumer waited for 20-30ms, though
+ // the server had lots of data in tcp send buffers on its side. Compared to default,
+ // this setting increased throughput increased by many fold (3-4x).
+ ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
+
+ // default to latest offset when we are not resuming.
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
+ // disable auto commit of offsets. we don't require group_id. could be enabled by user.
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ // default Kafka 0.9 Consumer supplier.
+ private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+ KAFKA_9_CONSUMER_FACTORY_FN =
+ new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
+ public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+ return new KafkaConsumer<>(config);
+ }
+ };
+ }
+
+ /**
+ * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+ * information on usage and configuration.
+ */
+ public static class TypedRead<K, V>
+ extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+
+ /**
+ * A function to assign a timestamp to a record. Default is processing timestamp.
+ */
+ public TypedRead<K, V> withTimestampFn2(
+ SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
+ checkNotNull(timestampFn);
+ return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+ timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+ maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * A function to calculate watermark after a record. Default is last record timestamp
+ * @see #withTimestampFn(SerializableFunction)
+ */
+ public TypedRead<K, V> withWatermarkFn2(
+ SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
+ checkNotNull(watermarkFn);
+ return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+ timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+ maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * A function to assign a timestamp to a record. Default is processing timestamp.
+ */
+ public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
+ checkNotNull(timestampFn);
+ return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
+ }
+
+ /**
+ * A function to calculate watermark after a record. Default is last record timestamp
+ * @see #withTimestampFn(SerializableFunction)
+ */
+ public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
+ checkNotNull(watermarkFn);
+ return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
+ }
+
+ /**
+ * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
+ */
+ public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
+ return new TypedWithoutMetadata<K, V>(this);
+ }
+
+ @Override
+ public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+ // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
+ Unbounded<KafkaRecord<K, V>> unbounded =
+ org.apache.beam.sdk.io.Read.from(makeSource());
+
+ PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+
+ if (maxNumRecords < Long.MAX_VALUE) {
+ transform = unbounded.withMaxNumRecords(maxNumRecords);
+ } else if (maxReadTime != null) {
+ transform = unbounded.withMaxReadTime(maxReadTime);
+ }
+
+ return input.getPipeline().apply(transform);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////
+
+ protected final List<String> topics;
+ protected final List<TopicPartition> topicPartitions; // mutually exclusive with topics
+ protected final Coder<K> keyCoder;
+ protected final Coder<V> valueCoder;
+ @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+ @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn;
+ protected final
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+ protected final Map<String, Object> consumerConfig;
+ protected final long maxNumRecords; // bounded read, mainly for testing
+ protected final Duration maxReadTime; // bounded read, mainly for testing
+
+ private TypedRead(List<String> topics,
+ List<TopicPartition> topicPartitions,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+ @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn,
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+ Map<String, Object> consumerConfig,
+ long maxNumRecords,
+ @Nullable Duration maxReadTime) {
+ super("KafkaIO.Read");
+
+ this.topics = topics;
+ this.topicPartitions = topicPartitions;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ this.timestampFn = timestampFn;
+ this.watermarkFn = watermarkFn;
+ this.consumerFactoryFn = consumerFactoryFn;
+ this.consumerConfig = consumerConfig;
+ this.maxNumRecords = maxNumRecords;
+ this.maxReadTime = maxReadTime;
+ }
+
+ /**
+ * Creates an {@link UnboundedSource<KafkaRecord<K, V>, ?>} with the configuration in
+ * {@link TypedRead}. Primary use case is unit tests, should not be used in an
+ * application.
+ */
+ @VisibleForTesting
+ UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
+ return new UnboundedKafkaSource<K, V>(
+ -1,
+ topics,
+ topicPartitions,
+ keyCoder,
+ valueCoder,
+ timestampFn,
+ Optional.fromNullable(watermarkFn),
+ consumerFactoryFn,
+ consumerConfig);
+ }
+
+ // utility method to convert KafkRecord<K, V> to user KV<K, V> before applying user functions
+ private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
+ unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
+ return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+ public OutT apply(KafkaRecord<KeyT, ValueT> record) {
+ return fn.apply(record.getKV());
+ }
+ };
+ }
+ }
+
+ /**
+ * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.TypedRead}, but
+ * removes Kafka metatdata and returns a {@link PCollection} of {@link KV}.
+ * See {@link KafkaIO} for more information on usage and configuration of reader.
+ */
+ public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+ private final TypedRead<K, V> typedRead;
+
+ TypedWithoutMetadata(TypedRead<K, V> read) {
+ super("KafkaIO.Read");
+ this.typedRead = read;
+ }
+
+ @Override
+ public PCollection<KV<K, V>> apply(PBegin begin) {
+ return typedRead
+ .apply(begin)
+ .apply("Remove Kafka Metadata",
+ ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+ @Override
+ public void processElement(ProcessContext ctx) {
+ ctx.output(ctx.element().getKV());
+ }
+ }));
+ }
+ }
+
+ /** Static class, prevent instantiation. */
+ private KafkaIO() {}
+
+ private static class UnboundedKafkaSource<K, V>
+ extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
+
+ private final int id; // split id, mainly for debugging
+ private final List<String> topics;
+ private final List<TopicPartition> assignedPartitions;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+ // would it be a good idea to pass currentTimestamp to watermarkFn?
+ private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn;
+ private
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+ private final Map<String, Object> consumerConfig;
+
+ public UnboundedKafkaSource(
+ int id,
+ List<String> topics,
+ List<TopicPartition> assignedPartitions,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+ Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn,
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+ Map<String, Object> consumerConfig) {
+
+ this.id = id;
+ this.assignedPartitions = assignedPartitions;
+ this.topics = topics;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ this.timestampFn =
+ (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : timestampFn);
+ this.watermarkFn = watermarkFn;
+ this.consumerFactoryFn = consumerFactoryFn;
+ this.consumerConfig = consumerConfig;
+ }
+
+ /**
+ * The partitions are evenly distributed among the splits. The number of splits returned is
+ * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
+ * count.
+ *
+ * <p> It is important to assign the partitions deterministically so that we can support
+ * resuming a split from last checkpoint. The Kafka partitions are sorted by
+ * {@code <topic, partition>} and then assigned to splits in round-robin order.
+ */
+ @Override
+ public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+
+ List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
+
+ // (a) fetch partitions for each topic
+ // (b) sort by <topic, partition>
+ // (c) round-robin assign the partitions to splits
+
+ if (partitions.isEmpty()) {
+ try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
+ for (String topic : topics) {
+ for (PartitionInfo p : consumer.partitionsFor(topic)) {
+ partitions.add(new TopicPartition(p.topic(), p.partition()));
+ }
+ }
+ }
+ }
+
+ Collections.sort(partitions, new Comparator<TopicPartition>() {
+ public int compare(TopicPartition tp1, TopicPartition tp2) {
+ return ComparisonChain
+ .start()
+ .compare(tp1.topic(), tp2.topic())
+ .compare(tp1.partition(), tp2.partition())
+ .result();
+ }
+ });
+
+ checkArgument(desiredNumSplits > 0);
+ checkState(partitions.size() > 0,
+ "Could not find any partitions. Please check Kafka configuration and topic names");
+
+ int numSplits = Math.min(desiredNumSplits, partitions.size());
+ List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
+
+ for (int i = 0; i < numSplits; i++) {
+ assignments.add(new ArrayList<TopicPartition>());
+ }
+ for (int i = 0; i < partitions.size(); i++) {
+ assignments.get(i % numSplits).add(partitions.get(i));
+ }
+
+ List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
+
+ for (int i = 0; i < numSplits; i++) {
+ List<TopicPartition> assignedToSplit = assignments.get(i);
+
+ LOG.info("Partitions assigned to split {} (total {}): {}",
+ i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit));
+
+ result.add(new UnboundedKafkaSource<K, V>(
+ i,
+ this.topics,
+ assignedToSplit,
+ this.keyCoder,
+ this.valueCoder,
+ this.timestampFn,
+ this.watermarkFn,
+ this.consumerFactoryFn,
+ this.consumerConfig));
+ }
+
+ return result;
+ }
+
+ @Override
+ public UnboundedKafkaReader<K, V> createReader(PipelineOptions options,
+ KafkaCheckpointMark checkpointMark) {
+ if (assignedPartitions.isEmpty()) {
+ LOG.warn("Looks like generateSplits() is not called. Generate single split.");
+ try {
+ return new UnboundedKafkaReader<K, V>(
+ generateInitialSplits(1, options).get(0), checkpointMark);
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+ }
+ return new UnboundedKafkaReader<K, V>(this, checkpointMark);
+ }
+
+ @Override
+ public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
+ return SerializableCoder.of(KafkaCheckpointMark.class);
+ }
+
+ @Override
+ public boolean requiresDeduping() {
+ // Kafka records are ordered with in partitions. In addition checkpoint guarantees
+ // records are not consumed twice.
+ return false;
+ }
+
+ @Override
+ public void validate() {
+ checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ "Kafka bootstrap servers should be set");
+ checkArgument(topics.size() > 0 || assignedPartitions.size() > 0,
+ "Kafka topics or topic_partitions are required");
+ }
+
+ @Override
+ public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+ return KafkaRecordCoder.of(keyCoder, valueCoder);
+ }
+ }
+
+ private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
+
+ private final UnboundedKafkaSource<K, V> source;
+ private final String name;
+ private Consumer<byte[], byte[]> consumer;
+ private final List<PartitionState> partitionStates;
+ private KafkaRecord<K, V> curRecord;
+ private Instant curTimestamp;
+ private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+
+ private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+ // how long to wait for new records from kafka consumer inside advance()
+ private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
+
+ // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
+ // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
+ // like 100 milliseconds does not work well. This along with large receive buffer for
+ // consumer achieved best throughput in tests (see `defaultConsumerProperties`).
+ private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
+ private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
+ new SynchronousQueue<>();
+ private volatile boolean closed = false;
+
+ // Backlog support :
+ // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
+ // then look at position(). Use another consumer to do this so that the primary consumer does
+ // not need to be interrupted. The latest offsets are fetched periodically on another thread.
+ // This is still a hack. There could be unintended side effects, e.g. if user enabled offset
+ // auto commit in consumer config, this could interfere with the primary consumer (we will
+ // handle this particular problem). We might have to make this optional.
+ private Consumer<byte[], byte[]> offsetConsumer;
+ private final ScheduledExecutorService offsetFetcherThread =
+ Executors.newSingleThreadScheduledExecutor();
+ private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+
+ /** watermark before any records have been read. */
+ private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
+
+ public String toString() {
+ return name;
+ }
+
+ // maintains state of each assigned partition (buffered records, consumed offset, etc)
+ private static class PartitionState {
+ private final TopicPartition topicPartition;
+ private long consumedOffset;
+ private long latestOffset;
+ private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
+
+ // simple moving average for size of each record in bytes
+ private double avgRecordSize = 0;
+ private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
+
+
+ PartitionState(TopicPartition partition, long offset) {
+ this.topicPartition = partition;
+ this.consumedOffset = offset;
+ this.latestOffset = -1;
+ }
+
+ // update consumedOffset and avgRecordSize
+ void recordConsumed(long offset, int size) {
+ consumedOffset = offset;
+
+ // this is always updated from single thread. probably not worth making it an AtomicDouble
+ if (avgRecordSize <= 0) {
+ avgRecordSize = size;
+ } else {
+ // initially, first record heavily contributes to average.
+ avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
+ }
+ }
+
+ synchronized void setLatestOffset(long latestOffset) {
+ this.latestOffset = latestOffset;
+ }
+
+ synchronized long approxBacklogInBytes() {
+ // Note that is an an estimate of uncompressed backlog.
+ // Messages on Kafka might be comressed.
+ if (latestOffset < 0 || consumedOffset < 0) {
+ return UnboundedReader.BACKLOG_UNKNOWN;
+ }
+ if (latestOffset <= consumedOffset || consumedOffset < 0) {
+ return 0;
+ }
+ return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
+ }
+ }
+
+ public UnboundedKafkaReader(
+ UnboundedKafkaSource<K, V> source,
+ @Nullable KafkaCheckpointMark checkpointMark) {
+
+ this.source = source;
+ this.name = "Reader-" + source.id;
+
+ partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
+ new Function<TopicPartition, PartitionState>() {
+ public PartitionState apply(TopicPartition tp) {
+ return new PartitionState(tp, -1L);
+ }
+ }));
+
+ if (checkpointMark != null) {
+ // a) verify that assigned and check-pointed partitions match exactly
+ // b) set consumed offsets
+
+ checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(),
+ "checkPointMark and assignedPartitions should match");
+ // we could consider allowing a mismatch, though it is not expected in current Dataflow
+
+ for (int i = 0; i < source.assignedPartitions.size(); i++) {
+ PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
+ TopicPartition assigned = source.assignedPartitions.get(i);
+
+ checkState(ckptMark.getTopicPartition().equals(assigned),
+ "checkpointed partition %s and assigned partition %s don't match",
+ ckptMark.getTopicPartition(), assigned);
+
+ partitionStates.get(i).consumedOffset = ckptMark.getOffset();
+ }
+ }
+ }
+
+ private void consumerPollLoop() {
+ // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
+ while (!closed) {
+ try {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+ if (!records.isEmpty()) {
+ availableRecordsQueue.put(records); // blocks until dequeued.
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
+ break;
+ } catch (WakeupException e) {
+ break;
+ }
+ }
+
+ LOG.info("{}: Returning from consumer pool loop", this);
+ }
+
+ private void nextBatch() {
+ curBatch = Collections.emptyIterator();
+
+ ConsumerRecords<byte[], byte[]> records;
+ try {
+ records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("{}: Unexpected", this, e);
+ return;
+ }
+
+ if (records == null) {
+ return;
+ }
+
+ List<PartitionState> nonEmpty = new LinkedList<>();
+
+ for (PartitionState p : partitionStates) {
+ p.recordIter = records.records(p.topicPartition).iterator();
+ if (p.recordIter.hasNext()) {
+ nonEmpty.add(p);
+ }
+ }
+
+ // cycle through the partitions in order to interleave records from each.
+ curBatch = Iterators.cycle(nonEmpty);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ consumer = source.consumerFactoryFn.apply(source.consumerConfig);
+ consumer.assign(source.assignedPartitions);
+
+ // seek to consumedOffset + 1 if it is set
+ for (PartitionState p : partitionStates) {
+ if (p.consumedOffset >= 0) {
+ LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
+ consumer.seek(p.topicPartition, p.consumedOffset + 1);
+ } else {
+ LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
+ }
+ }
+
+ // start consumer read loop.
+ // Note that consumer is not thread safe, should not accessed out side consumerPollLoop()
+ consumerPollThread.submit(
+ new Runnable() {
+ public void run() {
+ consumerPollLoop();
+ }
+ });
+
+ // offsetConsumer setup :
+
+ Object groupId = source.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
+ // override group_id and disable auto_commit so that it does not interfere with main consumer
+ String offsetGroupId = String.format("%s_offset_consumer_%d_%s", name,
+ (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
+ Map<String, Object> offsetConsumerConfig = new HashMap<>(source.consumerConfig);
+ offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
+ offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig);
+ offsetConsumer.assign(source.assignedPartitions);
+
+ offsetFetcherThread.scheduleAtFixedRate(
+ new Runnable() {
+ public void run() {
+ updateLatestOffsets();
+ }
+ }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ /* Read first record (if any). we need to loop here because :
+ * - (a) some records initially need to be skipped if they are before consumedOffset
+ * - (b) if curBatch is empty, we want to fetch next batch and then advance.
+ * - (c) curBatch is an iterator of iterators. we interleave the records from each.
+ * curBatch.next() might return an empty iterator.
+ */
+ while (true) {
+ if (curBatch.hasNext()) {
+ PartitionState pState = curBatch.next();
+
+ if (!pState.recordIter.hasNext()) { // -- (c)
+ pState.recordIter = Collections.emptyIterator(); // drop ref
+ curBatch.remove();
+ continue;
+ }
+
+ ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
+ long consumed = pState.consumedOffset;
+ long offset = rawRecord.offset();
+
+ if (consumed >= 0 && offset <= consumed) { // -- (a)
+ // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
+ // should we check if the offset is way off from consumedOffset (say > 1M)?
+ LOG.warn("{}: ignoring already consumed offset {} for {}",
+ this, offset, pState.topicPartition);
+ continue;
+ }
+
+ // sanity check
+ if (consumed >= 0 && (offset - consumed) != 1) {
+ LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
+ this, pState.topicPartition, consumed, offset - consumed - 1);
+ }
+
+ if (curRecord == null) {
+ LOG.info("{}: first record offset {}", name, offset);
+ }
+
+ curRecord = null; // user coders below might throw.
+
+ // apply user coders. might want to allow skipping records that fail to decode.
+ // TODO: wrap exceptions from coders to make explicit to users
+ KafkaRecord<K, V> record = new KafkaRecord<K, V>(
+ rawRecord.topic(),
+ rawRecord.partition(),
+ rawRecord.offset(),
+ decode(rawRecord.key(), source.keyCoder),
+ decode(rawRecord.value(), source.valueCoder));
+
+ curTimestamp = source.timestampFn.apply(record);
+ curRecord = record;
+
+ int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) +
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
+ pState.recordConsumed(offset, recordSize);
+ return true;
+
+ } else { // -- (b)
+ nextBatch();
+
+ if (!curBatch.hasNext()) {
+ return false;
+ }
+ }
+ }
+ }
+
+ private static byte[] nullBytes = new byte[0];
+ private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
+ // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
+ // This makes it impossible for user to distinguish between zero length byte and null.
+ // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
+ // coder.
+ byte[] toDecode = bytes == null ? nullBytes : bytes;
+ return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
+ }
+
+ // update latest offset for each partition.
+ // called from offsetFetcher thread
+ private void updateLatestOffsets() {
+ for (PartitionState p : partitionStates) {
+ try {
+ offsetConsumer.seekToEnd(p.topicPartition);
+ long offset = offsetConsumer.position(p.topicPartition);
+ p.setLatestOffset(offset);;
+ } catch (Exception e) {
+ LOG.warn("{}: exception while fetching latest offsets. ignored.", this, e);
+ p.setLatestOffset(-1L); // reset
+ }
+
+ LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})",
+ this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize);
+ }
+
+ LOG.debug("{}: backlog {}", this, getSplitBacklogBytes());
+ }
+
+ @Override
+ public Instant getWatermark() {
+ if (curRecord == null) {
+ LOG.warn("{}: getWatermark() : no records have been read yet.", name);
+ return initialWatermark;
+ }
+
+ return source.watermarkFn.isPresent() ?
+ source.watermarkFn.get().apply(curRecord) : curTimestamp;
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
+ Lists.transform(partitionStates,
+ new Function<PartitionState, PartitionMark>() {
+ public PartitionMark apply(PartitionState p) {
+ return new PartitionMark(p.topicPartition, p.consumedOffset);
+ }
+ }
+ )));
+ }
+
+ @Override
+ public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
+ // should we delay updating consumed offset till this point? Mostly not required.
+ return curRecord;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return curTimestamp;
+ }
+
+
+ @Override
+ public long getSplitBacklogBytes() {
+ long backlogBytes = 0;
+
+ for (PartitionState p : partitionStates) {
+ long pBacklog = p.approxBacklogInBytes();
+ if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
+ return UnboundedReader.BACKLOG_UNKNOWN;
+ }
+ backlogBytes += pBacklog;
+ }
+
+ return backlogBytes;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
+ consumer.wakeup();
+ consumerPollThread.shutdown();
+ offsetFetcherThread.shutdown();
+ Closeables.close(offsetConsumer, true);
+ Closeables.close(consumer, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
new file mode 100644
index 0000000..76e688b
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.values.KV;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * KafkaRecord contains key and value of the record as well as metadata for the record (topic name,
+ * partition id, and offset).
+ */
+public class KafkaRecord<K, V> implements Serializable {
+
+ private final String topic;
+ private final int partition;
+ private final long offset;
+ private final KV<K, V> kv;
+
+ public KafkaRecord(
+ String topic,
+ int partition,
+ long offset,
+ K key,
+ V value) {
+ this(topic, partition, offset, KV.of(key, value));
+ }
+
+ public KafkaRecord(
+ String topic,
+ int partition,
+ long offset,
+ KV<K, V> kv) {
+
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
+ this.kv = kv;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public KV<K, V> getKV() {
+ return kv;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv});
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof KafkaRecord) {
+ @SuppressWarnings("unchecked")
+ KafkaRecord<Object, Object> other = (KafkaRecord<Object, Object>) obj;
+ return topic.equals(other.topic)
+ && partition == other.partition
+ && offset == other.offset
+ && kv.equals(other.kv);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
new file mode 100644
index 0000000..8a3e7f5
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.KV;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * {@link Coder} for {@link KafkaRecord}.
+ */
+public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
+
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final VarLongCoder longCoder = VarLongCoder.of();
+ private static final VarIntCoder intCoder = VarIntCoder.of();
+
+ private final KvCoder<K, V> kvCoder;
+
+ @JsonCreator
+ public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ KvCoder<?, ?> kvCoder = KvCoder.of(components);
+ return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+ }
+
+ public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
+ return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
+ }
+
+ public KafkaRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+ this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+ }
+
+ @Override
+ public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ Context nested = context.nested();
+ stringCoder.encode(value.getTopic(), outStream, nested);
+ intCoder.encode(value.getPartition(), outStream, nested);
+ longCoder.encode(value.getOffset(), outStream, nested);
+ kvCoder.encode(value.getKV(), outStream, nested);
+ }
+
+ @Override
+ public KafkaRecord<K, V> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ Context nested = context.nested();
+ return new KafkaRecord<K, V>(
+ stringCoder.decode(inStream, nested),
+ intCoder.decode(inStream, nested),
+ longCoder.decode(inStream, nested),
+ kvCoder.decode(inStream, nested));
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return kvCoder.getCoderArguments();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ kvCoder.verifyDeterministic();
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
+ return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
+ //TODO : do we have to implement getEncodedSize()?
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
+ if (consistentWithEquals()) {
+ return value;
+ } else {
+ return new KafkaRecord<Object, Object>(
+ value.getTopic(),
+ value.getPartition(),
+ value.getOffset(),
+ (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
+ }
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return kvCoder.consistentWithEquals();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
new file mode 100644
index 0000000..96ffc98
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests of {@link KafkaSource}.
+ */
+@RunWith(JUnit4.class)
+public class KafkaIOTest {
+ /*
+ * The tests below borrow code and structure from CountingSourceTest. In addition verifies
+ * the reader interleaves the records from multiple partitions.
+ *
+ * Other tests to consider :
+ * - test KafkaRecordCoder
+ */
+
+ // Update mock consumer with records distributed among the given topics, each with given number
+ // of partitions. Records are assigned in round-robin order among the partitions.
+ private static MockConsumer<byte[], byte[]> mkMockConsumer(
+ List<String> topics, int partitionsPerTopic, int numElements) {
+
+ final List<TopicPartition> partitions = new ArrayList<>();
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+ Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
+
+ for (String topic : topics) {
+ List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
+ for (int i = 0; i < partitionsPerTopic; i++) {
+ partitions.add(new TopicPartition(topic, i));
+ partIds.add(new PartitionInfo(topic, i, null, null, null));
+ }
+ partitionMap.put(topic, partIds);
+ }
+
+ int numPartitions = partitions.size();
+ long[] offsets = new long[numPartitions];
+
+ for (int i = 0; i < numElements; i++) {
+ int pIdx = i % numPartitions;
+ TopicPartition tp = partitions.get(pIdx);
+
+ if (!records.containsKey(tp)) {
+ records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
+ }
+ records.get(tp).add(
+ // Note: this interface has changed in 0.10. may get fixed before the release.
+ new ConsumerRecord<byte[], byte[]>(
+ tp.topic(),
+ tp.partition(),
+ offsets[pIdx]++,
+ null, // key
+ ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
+ }
+
+ MockConsumer<byte[], byte[]> consumer =
+ new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+ // override assign() to add records that belong to the assigned partitions.
+ public void assign(List<TopicPartition> assigned) {
+ super.assign(assigned);
+ for (TopicPartition tp : assigned) {
+ for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+ addRecord(r);
+ }
+ updateBeginningOffsets(ImmutableMap.of(tp, 0L));
+ updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size()));
+ seek(tp, 0);
+ }
+ }
+ };
+
+ for (String topic : topics) {
+ consumer.updatePartitions(topic, partitionMap.get(topic));
+ }
+
+ return consumer;
+ }
+
+ private static class ConsumerFactoryFn
+ implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
+ private final List<String> topics;
+ private final int partitionsPerTopic;
+ private final int numElements;
+
+ public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
+ this.topics = topics;
+ this.partitionsPerTopic = partitionsPerTopic;
+ this.numElements = numElements;
+ }
+
+ public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+ return mkMockConsumer(topics, partitionsPerTopic, numElements);
+ }
+ }
+
+ /**
+ * Creates a consumer with two topics, with 5 partitions each.
+ * numElements are (round-robin) assigned all the 10 partitions.
+ */
+ private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
+ int numElements,
+ @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
+
+ List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+ KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
+ .withBootstrapServers("none")
+ .withTopics(topics)
+ .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
+ .withValueCoder(BigEndianLongCoder.of())
+ .withMaxNumRecords(numElements);
+
+ if (timestampFn != null) {
+ return reader.withTimestampFn(timestampFn);
+ } else {
+ return reader;
+ }
+ }
+
+ private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
+ private final int num;
+
+ public AssertMultipleOf(int num) {
+ this.num = num;
+ }
+
+ @Override
+ public Void apply(Iterable<Long> values) {
+ for (Long v : values) {
+ assertEquals(0, v % num);
+ }
+ return null;
+ }
+ }
+
+ public static void addCountingAsserts(PCollection<Long> input, long numElements) {
+ // Count == numElements
+ PAssert
+ .thatSingleton(input.apply("Count", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Unique count == numElements
+ PAssert
+ .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+ .apply("UniqueCount", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Min == 0
+ PAssert
+ .thatSingleton(input.apply("Min", Min.<Long>globally()))
+ .isEqualTo(0L);
+ // Max == numElements-1
+ PAssert
+ .thatSingleton(input.apply("Max", Max.<Long>globally()))
+ .isEqualTo(numElements - 1);
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedSource() {
+ Pipeline p = TestPipeline.create();
+ int numElements = 1000;
+
+ PCollection<Long> input = p
+ .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+ .withoutMetadata())
+ .apply(Values.<Long>create());
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedSourceWithExplicitPartitions() {
+ Pipeline p = TestPipeline.create();
+ int numElements = 1000;
+
+ List<String> topics = ImmutableList.of("test");
+
+ KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
+ .withBootstrapServers("none")
+ .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
+ .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
+ .withValueCoder(BigEndianLongCoder.of())
+ .withMaxNumRecords(numElements / 10);
+
+ PCollection<Long> input = p
+ .apply(reader.withoutMetadata())
+ .apply(Values.<Long>create());
+
+ // assert that every element is a multiple of 5.
+ PAssert
+ .that(input)
+ .satisfies(new AssertMultipleOf(5));
+
+ PAssert
+ .thatSingleton(input.apply(Count.<Long>globally()))
+ .isEqualTo(numElements / 10L);
+
+ p.run();
+ }
+
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element() - c.timestamp().getMillis());
+ }
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedSourceTimestamps() {
+ Pipeline p = TestPipeline.create();
+ int numElements = 1000;
+
+ PCollection<Long> input = p
+ .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
+ .apply(Values.<Long>create());
+
+ addCountingAsserts(input, numElements);
+
+ PCollection<Long> diffs = input
+ .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+ .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+ // This assert also confirms that diffs only has one unique value.
+ PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+ p.run();
+ }
+
+ private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+ @Override
+ public void processElement(ProcessContext ctx) throws Exception {
+ ctx.output(ctx.element().getKV());
+ }
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedSourceSplits() throws Exception {
+ Pipeline p = TestPipeline.create();
+ int numElements = 1000;
+ int numSplits = 10;
+
+ UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
+ mkKafkaReadTransform(numElements, null).makeSource();
+ List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
+ initial.generateInitialSplits(numSplits, p.getOptions());
+ assertEquals("Expected exact splitting", numSplits, splits.size());
+
+ long elementsPerSplit = numElements / numSplits;
+ assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
+ PCollectionList<Long> pcollections = PCollectionList.empty(p);
+ for (int i = 0; i < splits.size(); ++i) {
+ pcollections = pcollections.and(
+ p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
+ .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
+ .apply("collection " + i, Values.<Long>create()));
+ }
+ PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ /**
+ * A timestamp function that uses the given value as the timestamp.
+ */
+ private static class ValueAsTimestampFn
+ implements SerializableFunction<KV<byte[], Long>, Instant> {
+ @Override
+ public Instant apply(KV<byte[], Long> input) {
+ return new Instant(input.getValue());
+ }
+ }
+
+ @Test
+ public void testUnboundedSourceCheckpointMark() throws Exception {
+ int numElements = 85; // 85 to make sure some partitions have more records than other.
+
+ // create a single split:
+ UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
+ mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+ .makeSource()
+ .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
+ .get(0);
+
+ UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
+ final int numToSkip = 3;
+ // advance once:
+ assertTrue(reader.start());
+
+ // Advance the source numToSkip-1 elements and manually save state.
+ for (long l = 0; l < numToSkip - 1; ++l) {
+ assertTrue(reader.advance());
+ }
+
+ // Confirm that we get the expected element in sequence before checkpointing.
+
+ assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
+ assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
+
+ // Checkpoint and restart, and confirm that the source continues correctly.
+ KafkaCheckpointMark mark = CoderUtils.clone(
+ source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
+ reader = source.createReader(null, mark);
+ assertTrue(reader.start());
+
+ // Confirm that we get the next elements in sequence.
+ // This also confirms that Reader interleaves records from each partitions by the reader.
+ for (int i = numToSkip; i < numElements; i++) {
+ assertEquals(i, (long) reader.getCurrent().getKV().getValue());
+ assertEquals(i, reader.getCurrentTimestamp().getMillis());
+ reader.advance();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 75f192c..95d1f55 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -36,6 +36,7 @@
<modules>
<module>hdfs</module>
+ <module>kafka</module>
</modules>
-</project>
\ No newline at end of file
+</project>