You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/10/25 02:17:35 UTC
crunch git commit: CRUNCH-620: Reduce "isn't a known config" warnings
by slimming down ConsumerConfig properties
Repository: crunch
Updated Branches:
refs/heads/master e929e0444 -> fbda02f46
CRUNCH-620: Reduce "isn't a known config" warnings by slimming down ConsumerConfig properties
Resolved by tagging the Kafka connection properties so that the Kafka Consumers can be built with slimmer ConsumerConfig properties.
Signed-off-by: Micah Whitacre <mk...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fbda02f4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fbda02f4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fbda02f4
Branch: refs/heads/master
Commit: fbda02f46961c17b3f444424b166fbf65262711c
Parents: e929e04
Author: Stefan Mendoza <st...@cerner.com>
Authored: Mon Sep 12 22:38:41 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Mon Oct 24 21:15:23 2016 -0500
----------------------------------------------------------------------
.../org/apache/crunch/kafka/KafkaSource.java | 15 ++-
.../kafka/inputformat/KafkaInputFormat.java | 108 ++++++++++++++++++-
.../kafka/inputformat/KafkaRecordReader.java | 15 +--
.../org/apache/crunch/kafka/ClusterTest.java | 4 +-
.../org/apache/crunch/kafka/KafkaSourceIT.java | 8 +-
.../kafka/inputformat/KafkaInputFormatIT.java | 69 ++++++++++--
6 files changed, 189 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
index 485604d..ba7788b 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -148,23 +148,20 @@ public class KafkaSource
FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class);
KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
-
- for (String name : kafkaConnectionProperties.stringPropertyNames()) {
- bundle.set(name, kafkaConnectionProperties.getProperty(name));
- }
+ KafkaInputFormat.writeConnectionPropertiesToBundle(kafkaConnectionProperties, bundle);
return bundle;
}
- private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) {
+ private static <K, V> Properties copyAndSetProperties(Properties kafkaConnectionProperties) {
Properties props = new Properties();
- props.putAll(kakfaConnectionProperties);
+ props.putAll(kafkaConnectionProperties);
//Setting the key/value deserializer to ensure proper translation from Kafka to PType format.
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
- return props;
+ return KafkaInputFormat.tagExistingKafkaConnectionProperties(props);
}
@@ -173,8 +170,8 @@ public class KafkaSource
// consumer will get closed when the iterable is fully consumed.
// skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
// of parallelism when reading.
- Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props);
- return new KafkaRecordsIterable<>(consumer, offsets, props);
+ Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(KafkaInputFormat.filterConnectionProperties(props));
+ return new KafkaRecordsIterable<>(consumer, offsets, KafkaInputFormat.filterConnectionProperties(props));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
index eba4a97..0dadf97 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
@@ -36,17 +36,26 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
+import java.util.regex.Pattern;
/**
- * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped
- * inside of a {@link BytesWritable} instance.
+ * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped inside of a
+ * {@link BytesWritable} instance.
*
* Populating the configuration of the input format is handled with the convenience method of
* {@link #writeOffsetsToConfiguration(Map, Configuration)}. This should be done to ensure
* the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits}
* and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
+ *
+ * To suppress warnings generated by unused configs in the {@link org.apache.kafka.clients.consumer.ConsumerConfig ConsumerConfig},
+ * one can use {@link #tagExistingKafkaConnectionProperties(Properties) tagExistingKafkaConnectionProperties} and
+ * {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey} to prefix Kafka connection properties with
+ * "org.apache.crunch.kafka.connection.properties" to allow for retrieval later using {@link #getConnectionPropertyFromKey(String)
+ * getConnectionPropertyFromKey} and {@link #filterConnectionProperties(Properties) filterConnectionProperties}.
*/
+
public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
/**
@@ -74,6 +83,17 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
*/
private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$";
+ /**
+ * Constant for constructing configuration keys for the Kafka connection properties.
+ */
+ private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties";
+
+ /**
+ * Regex to discover all of the defined Kafka connection properties which should be passed to the ConsumerConfig.
+ */
+ private static final Pattern CONNECTION_PROPERTY_REGEX =
+ Pattern.compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$");
+
private Configuration configuration;
@Override
@@ -123,6 +143,7 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
/**
* Writes the start and end offsets for the provided topic partitions to the {@code bundle}.
+ *
* @param offsets The starting and ending offsets for the topics and partitions.
* @param bundle the bundle into which the information should be persisted.
*/
@@ -134,6 +155,7 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
/**
* Writes the start and end offsets for the provided topic partitions to the {@code config}.
+ *
* @param offsets The starting and ending offsets for the topics and partitions.
* @param config the config into which the information should be persisted.
*/
@@ -232,4 +254,86 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
return value;
}
+
+ // The following methods are convenience methods for dealing with Kafka connection properties. This includes:
+ // - writing Kafka connection properties to a FormatBundle
+ // - generating tagged Kafka connection properties using the prefix "org.apache.crunch.kafka.connection.properties"
+ // - retrieving Kafka connection properties prefixed by "org.apache.crunch.kafka.connection.properties"
+ // - filtering out Kafka connection properties from a Properties object
+ // - tagging all properties in a Properties object with the Kafka connection properties prefix
+ // The tagging of the Kafka connection properties allows for suppression of "isn't a known config" ConsumerConfig warnings that
+ // are generated by unused properties carried over from a Hadoop configuration.
+
+ /**
+ * Writes the Kafka connection properties to the {@code bundle}.
+ *
+ * @param connectionProperties the Kafka connection properties
+ * @param bundle the bundle into which the information should be persisted.
+ */
+ public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle) {
+ for (final String name : connectionProperties.stringPropertyNames()) {
+ bundle.set(name, connectionProperties.getProperty(name));
+ }
+ }
+
+ /**
+ * Prefixes a given property with "org.apache.crunch.kafka.connection.properties" to allow for filtering with
+ * {@link #filterConnectionProperties(Properties) filterConnectionProperties}.
+ *
+ * @param property the Kafka connection property that will be prefixed for retrieval at a later time.
+ * @return the property prefixed "org.apache.crunch.kafka.connection.properties"
+ */
+ static String generateConnectionPropertyKey(String property) {
+ return KAFKA_CONNECTION_PROPERTY_BASE + "." + property;
+ }
+
+ /**
+ *
+ * Retrieves the original property that was tagged using {@link #generateConnectionPropertyKey(String)
+ * generateConnectionPropertyKey}.
+ *
+ * @param key the key that was tagged using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}.
+ * @return the original property prior to tagging.
+ */
+ static String getConnectionPropertyFromKey(String key) {
+ // Strip off the base key + a trailing "."
+ return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1);
+ }
+
+ /**
+ * Generates a {@link Properties} object containing the properties in {@code connectionProperties}, but with every
+ * property prefixed with "org.apache.crunch.kafka.connection.properties".
+ *
+ * @param connectionProperties the properties to be prefixed with "org.apache.crunch.kafka.connection.properties"
+ * @return a {@link Properties} object representing Kafka connection properties
+ */
+ public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties) {
+ Properties taggedProperties = new Properties();
+
+ for (final String name : connectionProperties.stringPropertyNames()) {
+ taggedProperties.put(generateConnectionPropertyKey(name), connectionProperties.getProperty(name));
+ }
+
+ return taggedProperties;
+ }
+
+ /**
+ * Filters out Kafka connection properties that were tagged using {@link #generateConnectionPropertyKey(String)
+ * generateConnectionPropertyKey}.
+ *
+ * @param props the properties to be filtered.
+ * @return the properties containing Kafka connection information that were tagged using
+ * {@link #generateConnectionPropertyKey(String)}.
+ */
+ public static Properties filterConnectionProperties(Properties props) {
+ Properties filteredProperties = new Properties();
+
+ for (final String name : props.stringPropertyNames()) {
+ if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) {
+ filteredProperties.put(getConnectionPropertyFromKey(name), props.getProperty(name));
+ }
+ }
+
+ return filteredProperties;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
index 14c8030..3ed799b 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -46,6 +46,7 @@ import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
+import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties;
/**
* A {@link RecordReader} for pulling data from Kafka.
@@ -75,14 +76,15 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
if(!(inputSplit instanceof KafkaInputSplit)){
throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
}
- KafkaInputSplit split = (KafkaInputSplit) inputSplit;
- topicPartition = split.getTopicPartition();
-
- connectionProperties = getKafkaConnectionProperties(taskAttemptContext.getConfiguration());
+ Properties kafkaConnectionProperties = filterConnectionProperties(
+ getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
- consumer = new KafkaConsumer<>(connectionProperties);
+ consumer = new KafkaConsumer<>(kafkaConnectionProperties);
+ KafkaInputSplit split = (KafkaInputSplit) inputSplit;
+ TopicPartition topicPartition = split.getTopicPartition();
consumer.assign(Collections.singletonList(topicPartition));
+
//suggested hack to gather info without gathering data
consumer.poll(0);
//now seek to the desired start location
@@ -119,8 +121,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
}
return true;
} else {
- LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset,
- endingOffset);
+ LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, endingOffset);
}
}
record = null;
http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
index 836039c..38ded40 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -24,6 +24,7 @@ import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT;
import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT;
import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
@@ -135,7 +136,8 @@ public class ClusterTest {
public static Configuration getConsumerConfig() {
Configuration kafkaConfig = new Configuration(conf);
- KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig);
+ KafkaUtils.addKafkaConnectionProperties(KafkaInputFormat.tagExistingKafkaConnectionProperties(
+ getConsumerProperties()), kafkaConfig);
return kafkaConfig;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
index 3800c24..7f1323e 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
@@ -48,7 +48,7 @@ import java.util.Properties;
import java.util.Set;
import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
-import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.matchers.JUnitMatchers.hasItem;
@@ -105,8 +105,10 @@ public class KafkaSourceIT {
Set<String> keysRead = new HashSet<>();
int numRecordsFound = 0;
+ String currentKey;
for (Pair<BytesWritable, BytesWritable> values : read.materialize()) {
- assertThat(keys, hasItem(new String(values.first().getBytes())));
+ currentKey = new String(values.first().getBytes());
+ assertThat(keys, hasItem(currentKey));
numRecordsFound++;
keysRead.add(new String(values.first().getBytes()));
}
@@ -166,4 +168,4 @@ public class KafkaSourceIT {
return new String(input.first().getBytes());
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
index d760a02..3e7ab6f 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
@@ -89,13 +89,17 @@ public class KafkaInputFormatIT {
topic = testName.getMethodName();
consumerProps = ClusterTest.getConsumerProperties();
- consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
- consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+ consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+ KafkaSource.BytesDeserializer.class.getName());
+ consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+ KafkaSource.BytesDeserializer.class.getName());
config = ClusterTest.getConsumerConfig();
- config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
- config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+ config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+ KafkaSource.BytesDeserializer.class.getName());
+ config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+ KafkaSource.BytesDeserializer.class.getName());
}
@Test
@@ -183,9 +187,11 @@ public class KafkaInputFormatIT {
recordReader.initialize(split, taskContext);
int numRecordsFound = 0;
+ String currentKey;
while (recordReader.nextKeyValue()) {
- keysRead.add(new String(recordReader.getCurrentKey().getBytes()));
- assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes())));
+ currentKey = new String(recordReader.getCurrentKey().getBytes());
+ keysRead.add(currentKey);
+ assertThat(keys, hasItem(currentKey));
assertThat(recordReader.getCurrentValue(), is(notNullValue()));
numRecordsFound++;
}
@@ -354,6 +360,54 @@ public class KafkaInputFormatIT {
}
}
+ @Test
+ public void generateConnectionPropertyKey() {
+ String propertyName = "some.property";
+ String actual = KafkaInputFormat.generateConnectionPropertyKey(propertyName);
+ String expected = "org.apache.crunch.kafka.connection.properties.some.property";
+ assertThat(expected, is(actual));
+ }
+
+ @Test
+ public void getConnectionPropertyFromKey() {
+ String prefixedConnectionProperty = "org.apache.crunch.kafka.connection.properties.some.property";
+ String actual = KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty);
+ String expected = "some.property";
+ assertThat(expected, is(actual));
+ }
+
+ @Test
+ public void writeConnectionPropertiesToBundle() {
+ FormatBundle<KafkaInputFormat> actual = FormatBundle.forInput(KafkaInputFormat.class);
+ Properties connectionProperties = new Properties();
+ connectionProperties.put("key1", "value1");
+ connectionProperties.put("key2", "value2");
+ KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, actual);
+
+ FormatBundle<KafkaInputFormat> expected = FormatBundle.forInput(KafkaInputFormat.class);
+ expected.set("key1", "value1");
+ expected.set("key2", "value2");
+
+ assertThat(expected, is(actual));
+ }
+
+ @Test
+ public void filterConnectionProperties() {
+ Properties props = new Properties();
+ props.put("org.apache.crunch.kafka.connection.properties.key1", "value1");
+ props.put("org.apache.crunch.kafka.connection.properties.key2", "value2");
+ props.put("org_apache_crunch_kafka_connection_properties.key3", "value3");
+ props.put("org.apache.crunch.another.prefix.properties.key4", "value4");
+
+ Properties actual = KafkaInputFormat.filterConnectionProperties(props);
+ Properties expected = new Properties();
+ expected.put("key1", "value1");
+ expected.put("key2", "value2");
+
+ assertThat(expected, is(actual));
+ }
+
+
@Test(expected=IllegalStateException.class)
public void getOffsetsFromConfigMissingStart() {
Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
@@ -403,5 +457,4 @@ public class KafkaInputFormatIT {
Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
}
-
-}
+}
\ No newline at end of file