You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/10/25 02:17:35 UTC

crunch git commit: CRUNCH-620: Reduce "isn't a known config" warnings by slimming down ConsumerConfig properties

Repository: crunch
Updated Branches:
  refs/heads/master e929e0444 -> fbda02f46


CRUNCH-620: Reduce "isn't a known config" warnings by slimming down ConsumerConfig properties

Resolved by tagging the Kafka connection properties so that the Kafka Consumers can be built with slimmer ConsumerConfig properties.

Signed-off-by: Micah Whitacre <mk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fbda02f4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fbda02f4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fbda02f4

Branch: refs/heads/master
Commit: fbda02f46961c17b3f444424b166fbf65262711c
Parents: e929e04
Author: Stefan Mendoza <st...@cerner.com>
Authored: Mon Sep 12 22:38:41 2016 -0500
Committer: Micah Whitacre <mk...@gmail.com>
Committed: Mon Oct 24 21:15:23 2016 -0500

----------------------------------------------------------------------
 .../org/apache/crunch/kafka/KafkaSource.java    |  15 ++-
 .../kafka/inputformat/KafkaInputFormat.java     | 108 ++++++++++++++++++-
 .../kafka/inputformat/KafkaRecordReader.java    |  15 +--
 .../org/apache/crunch/kafka/ClusterTest.java    |   4 +-
 .../org/apache/crunch/kafka/KafkaSourceIT.java  |   8 +-
 .../kafka/inputformat/KafkaInputFormatIT.java   |  69 ++++++++++--
 6 files changed, 189 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
index 485604d..ba7788b 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -148,23 +148,20 @@ public class KafkaSource
     FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class);
 
     KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
-
-    for (String name : kafkaConnectionProperties.stringPropertyNames()) {
-      bundle.set(name, kafkaConnectionProperties.getProperty(name));
-    }
+    KafkaInputFormat.writeConnectionPropertiesToBundle(kafkaConnectionProperties, bundle);
 
     return bundle;
   }
 
-  private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) {
+  private static <K, V> Properties copyAndSetProperties(Properties kafkaConnectionProperties) {
     Properties props = new Properties();
-    props.putAll(kakfaConnectionProperties);
+    props.putAll(kafkaConnectionProperties);
 
     //Setting the key/value deserializer to ensure proper translation from Kafka to PType format.
     props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
     props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
 
-    return props;
+    return KafkaInputFormat.tagExistingKafkaConnectionProperties(props);
   }
 
 
@@ -173,8 +170,8 @@ public class KafkaSource
     // consumer will get closed when the iterable is fully consumed.
     // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
     // of parallelism when reading.
-    Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props);
-    return new KafkaRecordsIterable<>(consumer, offsets, props);
+    Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(KafkaInputFormat.filterConnectionProperties(props));
+    return new KafkaRecordsIterable<>(consumer, offsets, KafkaInputFormat.filterConnectionProperties(props));
   }
 
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
index eba4a97..0dadf97 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
@@ -36,17 +36,26 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
- * Basic input format for reading data from Kafka.  Data is read and maintained in its pure byte form and wrapped
- * inside of a {@link BytesWritable} instance.
+ * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped inside of a
+ * {@link BytesWritable} instance.
  *
  * Populating the configuration of the input format is handled with the convenience method of
  * {@link #writeOffsetsToConfiguration(Map, Configuration)}.  This should be done to ensure
  * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits}
  * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
+ *
+ * To suppress warnings generated by unused configs in the {@link org.apache.kafka.clients.consumer.ConsumerConfig ConsumerConfig},
+ * one can use {@link #tagExistingKafkaConnectionProperties(Properties) tagExistingKafkaConnectionProperties} and
+ * {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey} to prefix Kafka connection properties with
+ * "org.apache.crunch.kafka.connection.properties" to allow for retrieval later using {@link #getConnectionPropertyFromKey(String)
+ * getConnectionPropertyFromKey} and {@link #filterConnectionProperties(Properties) filterConnectionProperties}.
  */
+
 public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
 
   /**
@@ -74,6 +83,17 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
    */
   private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$";
 
+  /**
+   * Constant for constructing configuration keys for the Kafka connection properties.
+   */
+  private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties";
+
+  /**
+   * Regex to discover all of the defined Kafka connection properties which should be passed to the ConsumerConfig.
+   */
+  private static final Pattern CONNECTION_PROPERTY_REGEX =
+      Pattern.compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$");
+
   private Configuration configuration;
 
   @Override
@@ -123,6 +143,7 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
 
   /**
    * Writes the start and end offsets for the provided topic partitions to the {@code bundle}.
+   *
    * @param offsets The starting and ending offsets for the topics and partitions.
    * @param bundle the bundle into which the information should be persisted.
    */
@@ -134,6 +155,7 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
 
   /**
    * Writes the start and end offsets for the provided topic partitions to the {@code config}.
+   *
    * @param offsets The starting and ending offsets for the topics and partitions.
    * @param config the config into which the information should be persisted.
    */
@@ -232,4 +254,86 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable>
 
     return value;
   }
+
+  // The following methods are convenience methods for dealing with Kafka connection properties. This includes:
+  //    - writing Kafka connection properties to a FormatBundle
+  //    - generating tagged Kafka connection properties using the prefix "org.apache.crunch.kafka.connection.properties"
+  //    - retrieving Kafka connection properties prefixed by "org.apache.crunch.kafka.connection.properties"
+  //    - filtering out Kafka connection properties from a Properties object
+  //    - tagging all properties in a Properties object with the Kafka connection properties prefix
+  // The tagging of the Kafka connection properties allows for suppression of "isn't a known config" ConsumerConfig warnings that
+  // are generated by unused properties carried over from a Hadoop configuration.
+
+  /**
+   * Writes the Kafka connection properties to the {@code bundle}.
+   *
+   * @param connectionProperties the Kafka connection properties
+   * @param bundle the bundle into which the information should be persisted.
+   */
+  public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle) {
+    for (final String name : connectionProperties.stringPropertyNames()) {
+      bundle.set(name, connectionProperties.getProperty(name));
+    }
+  }
+
+  /**
+   * Prefixes a given property with "org.apache.crunch.kafka.connection.properties" to allow for filtering with
+   * {@link #filterConnectionProperties(Properties) filterConnectionProperties}.
+   *
+   * @param property the Kafka connection property that will be prefixed for retrieval at a later time.
+   * @return the property prefixed "org.apache.crunch.kafka.connection.properties"
+   */
+  static String generateConnectionPropertyKey(String property) {
+    return KAFKA_CONNECTION_PROPERTY_BASE + "." + property;
+  }
+
+  /**
+   *
+   * Retrieves the original property that was tagged using {@link #generateConnectionPropertyKey(String)
+   * generateConnectionPropertyKey}.
+   *
+   * @param key the key that was tagged using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}.
+   * @return the original property prior to tagging.
+   */
+  static String getConnectionPropertyFromKey(String key) {
+    // Strip off the base key + a trailing "."
+    return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1);
+  }
+
+  /**
+   * Generates a {@link Properties} object containing the properties in {@code connectionProperties}, but with every
+   * property prefixed with "org.apache.crunch.kafka.connection.properties".
+   *
+   * @param connectionProperties the properties to be prefixed with "org.apache.crunch.kafka.connection.properties"
+   * @return a {@link Properties} object representing Kafka connection properties
+   */
+  public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties) {
+    Properties taggedProperties = new Properties();
+
+    for (final String name : connectionProperties.stringPropertyNames()) {
+      taggedProperties.put(generateConnectionPropertyKey(name), connectionProperties.getProperty(name));
+    }
+
+    return taggedProperties;
+  }
+
+  /**
+   * Filters out Kafka connection properties that were tagged using {@link #generateConnectionPropertyKey(String)
+   * generateConnectionPropertyKey}.
+   *
+   * @param props the properties to be filtered.
+   * @return the properties containing Kafka connection information that were tagged using
+   *         {@link #generateConnectionPropertyKey(String)}.
+   */
+  public static Properties filterConnectionProperties(Properties props) {
+    Properties filteredProperties = new Properties();
+
+    for (final String name : props.stringPropertyNames()) {
+      if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) {
+        filteredProperties.put(getConnectionPropertyFromKey(name), props.getProperty(name));
+      }
+    }
+
+    return filteredProperties;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
index 14c8030..3ed799b 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -46,6 +46,7 @@ import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
 import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
 import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
 import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
+import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties;
 
 /**
  * A {@link RecordReader} for pulling data from Kafka.
@@ -75,14 +76,15 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
     if(!(inputSplit instanceof KafkaInputSplit)){
       throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
     }
-    KafkaInputSplit split = (KafkaInputSplit) inputSplit;
-    topicPartition = split.getTopicPartition();
-
-    connectionProperties = getKafkaConnectionProperties(taskAttemptContext.getConfiguration());
+    Properties kafkaConnectionProperties = filterConnectionProperties(
+            getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
 
-    consumer = new KafkaConsumer<>(connectionProperties);
+    consumer = new KafkaConsumer<>(kafkaConnectionProperties);
+    KafkaInputSplit split = (KafkaInputSplit) inputSplit;
+    TopicPartition topicPartition = split.getTopicPartition();
 
     consumer.assign(Collections.singletonList(topicPartition));
+
     //suggested hack to gather info without gathering data
     consumer.poll(0);
     //now seek to the desired start location
@@ -119,8 +121,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
         }
         return true;
       } else {
-        LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset,
-                endingOffset);
+        LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, endingOffset);
       }
     }
     record = null;

http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
index 836039c..38ded40 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -24,6 +24,7 @@ import kafka.serializer.Decoder;
 import kafka.serializer.Encoder;
 import kafka.utils.VerifiableProperties;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
 import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT;
 import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT;
 import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
@@ -135,7 +136,8 @@ public class ClusterTest {
 
   public static Configuration getConsumerConfig() {
     Configuration kafkaConfig = new Configuration(conf);
-    KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig);
+    KafkaUtils.addKafkaConnectionProperties(KafkaInputFormat.tagExistingKafkaConnectionProperties(
+        getConsumerProperties()), kafkaConfig);
     return kafkaConfig;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
index 3800c24..7f1323e 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
@@ -48,7 +48,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
-import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.matchers.JUnitMatchers.hasItem;
@@ -105,8 +105,10 @@ public class KafkaSourceIT {
 
     Set<String> keysRead = new HashSet<>();
     int numRecordsFound = 0;
+    String currentKey;
     for (Pair<BytesWritable, BytesWritable> values : read.materialize()) {
-      assertThat(keys, hasItem(new String(values.first().getBytes())));
+      currentKey = new String(values.first().getBytes());
+      assertThat(keys, hasItem(currentKey));
       numRecordsFound++;
       keysRead.add(new String(values.first().getBytes()));
     }
@@ -166,4 +168,4 @@ public class KafkaSourceIT {
       return new String(input.first().getBytes());
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
index d760a02..3e7ab6f 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
@@ -89,13 +89,17 @@ public class KafkaInputFormatIT {
     topic = testName.getMethodName();
     consumerProps = ClusterTest.getConsumerProperties();
 
-    consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
-    consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+    consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+        KafkaSource.BytesDeserializer.class.getName());
+    consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+        KafkaSource.BytesDeserializer.class.getName());
 
     config = ClusterTest.getConsumerConfig();
 
-    config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
-    config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+    config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+        KafkaSource.BytesDeserializer.class.getName());
+    config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+        KafkaSource.BytesDeserializer.class.getName());
   }
 
   @Test
@@ -183,9 +187,11 @@ public class KafkaInputFormatIT {
       recordReader.initialize(split, taskContext);
 
       int numRecordsFound = 0;
+      String currentKey;
       while (recordReader.nextKeyValue()) {
-        keysRead.add(new String(recordReader.getCurrentKey().getBytes()));
-        assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes())));
+        currentKey = new String(recordReader.getCurrentKey().getBytes());
+        keysRead.add(currentKey);
+        assertThat(keys, hasItem(currentKey));
         assertThat(recordReader.getCurrentValue(), is(notNullValue()));
         numRecordsFound++;
       }
@@ -354,6 +360,54 @@ public class KafkaInputFormatIT {
     }
   }
 
+  @Test
+  public void generateConnectionPropertyKey() {
+    String propertyName = "some.property";
+    String actual = KafkaInputFormat.generateConnectionPropertyKey(propertyName);
+    String expected = "org.apache.crunch.kafka.connection.properties.some.property";
+    assertThat(expected, is(actual));
+  }
+
+  @Test
+  public void getConnectionPropertyFromKey() {
+    String prefixedConnectionProperty = "org.apache.crunch.kafka.connection.properties.some.property";
+    String actual = KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty);
+    String expected = "some.property";
+    assertThat(expected, is(actual));
+  }
+
+  @Test
+  public void writeConnectionPropertiesToBundle() {
+    FormatBundle<KafkaInputFormat> actual = FormatBundle.forInput(KafkaInputFormat.class);
+    Properties connectionProperties = new Properties();
+    connectionProperties.put("key1", "value1");
+    connectionProperties.put("key2", "value2");
+    KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, actual);
+
+    FormatBundle<KafkaInputFormat> expected = FormatBundle.forInput(KafkaInputFormat.class);
+    expected.set("key1", "value1");
+    expected.set("key2", "value2");
+
+    assertThat(expected, is(actual));
+  }
+
+  @Test
+  public void filterConnectionProperties() {
+    Properties props = new Properties();
+    props.put("org.apache.crunch.kafka.connection.properties.key1", "value1");
+    props.put("org.apache.crunch.kafka.connection.properties.key2", "value2");
+    props.put("org_apache_crunch_kafka_connection_properties.key3", "value3");
+    props.put("org.apache.crunch.another.prefix.properties.key4", "value4");
+
+    Properties actual = KafkaInputFormat.filterConnectionProperties(props);
+    Properties expected = new Properties();
+    expected.put("key1", "value1");
+    expected.put("key2", "value2");
+
+    assertThat(expected, is(actual));
+  }
+
+
   @Test(expected=IllegalStateException.class)
   public void getOffsetsFromConfigMissingStart() {
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
@@ -403,5 +457,4 @@ public class KafkaInputFormatIT {
 
     Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
   }
-
-}
+}
\ No newline at end of file