You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2017/12/11 18:03:48 UTC

crunch git commit: CRUNCH-654: KafkaSource should use the new Kafka Consumer API instead of the SimpleConsumer. Contributed by Bryan Baugher.

Repository: crunch
Updated Branches:
  refs/heads/master 5609b0143 -> 8e5c2ad37


CRUNCH-654: KafkaSource should use the new Kafka Consumer API instead of the SimpleConsumer. Contributed by Bryan Baugher.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8e5c2ad3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8e5c2ad3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8e5c2ad3

Branch: refs/heads/master
Commit: 8e5c2ad37c6e2f0fbc428bddf69da3644d535456
Parents: 5609b01
Author: Josh Wills <jw...@apache.org>
Authored: Mon Dec 11 09:56:38 2017 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 11 09:58:17 2017 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/kafka/KafkaUtils.java     | 22 +++++++++++++++-----
 .../crunch/kafka/record/KafkaRecordReader.java  |  8 +++----
 .../org/apache/crunch/kafka/KafkaUtilsIT.java   | 12 +++++++++--
 .../kafka/utils/KafkaBrokerTestHarness.java     | 13 ++++++++++--
 pom.xml                                         |  2 +-
 5 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index f3da5e9..2ed6412 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -33,11 +33,13 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConversions;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -147,7 +149,9 @@ public class KafkaUtils {
    *                                  the topics are {@code null}, empty or blank, or if there is an error parsing the
    *                                  properties.
    * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
+   * @deprecated As of 1.0. Use beginning/end offset APIs on {@link org.apache.kafka.clients.consumer.Consumer}
    */
+  @Deprecated
   public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
     if (properties == null)
       throw new IllegalArgumentException("properties cannot be null");
@@ -179,7 +183,7 @@ public class KafkaUtils {
         topicMetadataResponse = consumer.send(topicMetadataRequest);
         break;
       } catch (Exception err) {
-        EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+        EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
         LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
             Arrays.toString(topics), endpoint.host()), err);
       } finally {
@@ -209,7 +213,12 @@ public class KafkaUtils {
           throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic()
             +" partition:"+partition.partitionId());
         }
-        Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
+
+        EndPoint endPoint = new EndPoint(brokerEndPoint.host(), brokerEndPoint.port(),
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+
+        Broker leader = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
+            Option.<String>empty());
 
         if (brokerRequests.containsKey(leader))
           requestInfo = brokerRequests.get(leader);
@@ -279,7 +288,7 @@ public class KafkaUtils {
    */
   private static SimpleConsumer getSimpleConsumer(final Broker broker) {
     // BrokerHost, BrokerPort, timeout, buffer size, client id
-    EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+    EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
     return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
   }
 
@@ -309,7 +318,10 @@ public class KafkaUtils {
         throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
             + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
       try {
-        brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT));
+        EndPoint endPoint = new EndPoint(brokerHostPort[0], Integer.parseInt(brokerHostPort[1]),
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+        brokers.add(new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
+            Option.<String>empty()));
       } catch (NumberFormatException e) {
         throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
index ec138b3..ef2ec49 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -17,9 +17,7 @@
  */
 package org.apache.crunch.kafka.record;
 
-import kafka.api.OffsetRequest;
 import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.kafka.KafkaUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -214,10 +212,10 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>,
    * @return the earliest offset of the topic partition
    */
   protected long getEarliestOffset() {
-    Map<TopicPartition, Long> brokerOffsets = KafkaUtils
-        .getBrokerOffsets(kafkaConnectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic());
+    Map<TopicPartition, Long> brokerOffsets = consumer.beginningOffsets(
+        Collections.singletonList(topicPartition));
     Long offset = brokerOffsets.get(topicPartition);
-    if (offset == null) {
+    if(offset == null){
       LOG.debug("Unable to determine earliest offset for {} so returning 0", topicPartition);
       return 0L;
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
index 38c3fce..dc4ea82 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
@@ -18,11 +18,13 @@
 package org.apache.crunch.kafka;
 
 import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
 import org.apache.crunch.kafka.ClusterTest;
 import org.apache.crunch.kafka.KafkaUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -30,6 +32,8 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import scala.Option;
+import scala.collection.JavaConversions;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -63,7 +67,9 @@ public class KafkaUtilsIT {
     String brokerHost = brokerHostPort[0];
     int brokerPort = Integer.parseInt(brokerHostPort[1]);
 
-    broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT);
+    EndPoint endPoint = new EndPoint(brokerHost, brokerPort,
+        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+    broker = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
   }
 
   @AfterClass
@@ -181,7 +187,9 @@ public class KafkaUtilsIT {
 
   @Test
   public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
-    final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT);
+    EndPoint endPoint = new EndPoint("dummyBrokerHost1", 0,
+        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+    final Broker bad = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
     assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
     assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
index f47f168..13a4e2c 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
@@ -17,17 +17,20 @@
 
 package org.apache.crunch.kafka.utils;
 
+import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.Time;
 import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.utils.Time;
 import scala.Option;
+import scala.collection.JavaConversions;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -341,7 +344,8 @@ public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
   }
 
   private static KafkaServer startBroker(KafkaConfig config) {
-    KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty());
+    KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty(),
+        JavaConversions.asScalaBuffer(Collections.<KafkaMetricsReporter>emptyList()));
     server.startup();
     return server;
   }
@@ -353,6 +357,11 @@ public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
     }
 
     @Override
+    public long hiResClockMs() {
+      return System.currentTimeMillis();
+    }
+
+    @Override
     public long nanoseconds() {
       return System.nanoTime();
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 248aa05..9af5374 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@ under the License.
     <avro.classifier>hadoop2</avro.classifier>
     <hive.version>2.1.0</hive.version>
 
-    <kafka.version>0.10.0.1</kafka.version>
+    <kafka.version>0.10.2.1</kafka.version>
     <scala.base.version>2.11</scala.base.version>
     <scala.version>2.11.8</scala.version>
     <scalatest.version>2.2.4</scalatest.version>