You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2017/12/11 18:03:48 UTC
crunch git commit: CRUNCH-654: KafkaSource should use the new Kafka
Consumer API instead of the SimpleConsumer. Contributed by Bryan Baugher.
Repository: crunch
Updated Branches:
refs/heads/master 5609b0143 -> 8e5c2ad37
CRUNCH-654: KafkaSource should use the new Kafka Consumer API instead of the SimpleConsumer. Contributed by Bryan Baugher.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8e5c2ad3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8e5c2ad3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8e5c2ad3
Branch: refs/heads/master
Commit: 8e5c2ad37c6e2f0fbc428bddf69da3644d535456
Parents: 5609b01
Author: Josh Wills <jw...@apache.org>
Authored: Mon Dec 11 09:56:38 2017 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 11 09:58:17 2017 -0800
----------------------------------------------------------------------
.../org/apache/crunch/kafka/KafkaUtils.java | 22 +++++++++++++++-----
.../crunch/kafka/record/KafkaRecordReader.java | 8 +++----
.../org/apache/crunch/kafka/KafkaUtilsIT.java | 12 +++++++++--
.../kafka/utils/KafkaBrokerTestHarness.java | 13 ++++++++++--
pom.xml | 2 +-
5 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index f3da5e9..2ed6412 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -33,11 +33,13 @@ import org.apache.commons.lang.StringUtils;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConversions;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -147,7 +149,9 @@ public class KafkaUtils {
* the topics are {@code null}, empty or blank, or if there is an error parsing the
* properties.
* @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
+ * @deprecated As of 1.0. Use beginning/end offset APIs on {@link org.apache.kafka.clients.consumer.Consumer}
*/
+ @Deprecated
public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
if (properties == null)
throw new IllegalArgumentException("properties cannot be null");
@@ -179,7 +183,7 @@ public class KafkaUtils {
topicMetadataResponse = consumer.send(topicMetadataRequest);
break;
} catch (Exception err) {
- EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+ EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
Arrays.toString(topics), endpoint.host()), err);
} finally {
@@ -209,7 +213,12 @@ public class KafkaUtils {
throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic()
+" partition:"+partition.partitionId());
}
- Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
+
+ EndPoint endPoint = new EndPoint(brokerEndPoint.host(), brokerEndPoint.port(),
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+
+ Broker leader = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
+ Option.<String>empty());
if (brokerRequests.containsKey(leader))
requestInfo = brokerRequests.get(leader);
@@ -279,7 +288,7 @@ public class KafkaUtils {
*/
private static SimpleConsumer getSimpleConsumer(final Broker broker) {
// BrokerHost, BrokerPort, timeout, buffer size, client id
- EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+ EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
}
@@ -309,7 +318,10 @@ public class KafkaUtils {
throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
+ Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
try {
- brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT));
+ EndPoint endPoint = new EndPoint(brokerHostPort[0], Integer.parseInt(brokerHostPort[1]),
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+ brokers.add(new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
+ Option.<String>empty()));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
index ec138b3..ef2ec49 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -17,9 +17,7 @@
*/
package org.apache.crunch.kafka.record;
-import kafka.api.OffsetRequest;
import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.kafka.KafkaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -214,10 +212,10 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>,
* @return the earliest offset of the topic partition
*/
protected long getEarliestOffset() {
- Map<TopicPartition, Long> brokerOffsets = KafkaUtils
- .getBrokerOffsets(kafkaConnectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic());
+ Map<TopicPartition, Long> brokerOffsets = consumer.beginningOffsets(
+ Collections.singletonList(topicPartition));
Long offset = brokerOffsets.get(topicPartition);
- if (offset == null) {
+ if(offset == null){
LOG.debug("Unable to determine earliest offset for {} so returning 0", topicPartition);
return 0L;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
index 38c3fce..dc4ea82 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
@@ -18,11 +18,13 @@
package org.apache.crunch.kafka;
import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
import org.apache.crunch.kafka.ClusterTest;
import org.apache.crunch.kafka.KafkaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.junit.AfterClass;
import org.junit.Before;
@@ -30,6 +32,8 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import scala.Option;
+import scala.collection.JavaConversions;
import java.io.IOException;
import java.util.Arrays;
@@ -63,7 +67,9 @@ public class KafkaUtilsIT {
String brokerHost = brokerHostPort[0];
int brokerPort = Integer.parseInt(brokerHostPort[1]);
- broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT);
+ EndPoint endPoint = new EndPoint(brokerHost, brokerPort,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+ broker = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
}
@AfterClass
@@ -181,7 +187,9 @@ public class KafkaUtilsIT {
@Test
public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
- final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT);
+ EndPoint endPoint = new EndPoint("dummyBrokerHost1", 0,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
+ final Broker bad = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
index f47f168..13a4e2c 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
@@ -17,17 +17,20 @@
package org.apache.crunch.kafka.utils;
+import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.Time;
import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.utils.Time;
import scala.Option;
+import scala.collection.JavaConversions;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -341,7 +344,8 @@ public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
}
private static KafkaServer startBroker(KafkaConfig config) {
- KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty());
+ KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty(),
+ JavaConversions.asScalaBuffer(Collections.<KafkaMetricsReporter>emptyList()));
server.startup();
return server;
}
@@ -353,6 +357,11 @@ public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
}
@Override
+ public long hiResClockMs() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
public long nanoseconds() {
return System.nanoTime();
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8e5c2ad3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 248aa05..9af5374 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@ under the License.
<avro.classifier>hadoop2</avro.classifier>
<hive.version>2.1.0</hive.version>
- <kafka.version>0.10.0.1</kafka.version>
+ <kafka.version>0.10.2.1</kafka.version>
<scala.base.version>2.11</scala.base.version>
<scala.version>2.11.8</scala.version>
<scalatest.version>2.2.4</scalatest.version>