You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/10/13 17:48:20 UTC
[4/5] hive git commit: HIVE-20639 : Add ability to Write Data from
Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
index c252455..2225f19 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -25,24 +25,31 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
- * Iterator over Kafka Records to read records from a single topic partition inclusive start, exclusive end.
+ * This class implements an Iterator over a single Kafka topic partition.
+ *
+ * Notes:
+ * The user of this class has to provide a functional Kafka Consumer and then has to clean it afterward.
+ * The user of this class is responsible for thread safety if the provided consumer is shared across threads.
+ *
*/
-public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class);
private static final String
POLL_TIMEOUT_HINT =
String.format("Try increasing poll timeout using Hive Table property [%s]",
- KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT);
+ KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName());
private static final String
ERROR_POLL_TIMEOUT_FORMAT =
"Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] "
@@ -54,30 +61,38 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
private final long endOffset;
private final long startOffset;
private final long pollTimeoutMs;
+ private final Duration pollTimeoutDurationMs;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private ConsumerRecords<byte[], byte[]> records;
+ /**
+ * Holds the kafka consumer position after the last poll() call.
+ */
private long consumerPosition;
private ConsumerRecord<byte[], byte[]> nextRecord;
private boolean hasMore = true;
+ /**
+ * On each Kafka Consumer poll() call we get a batch of records, this Iterator will be used to loop over it.
+ */
private Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = null;
/**
- * Iterator over Kafka Records over a single {@code topicPartition} inclusive {@code startOffset},
- * up to exclusive {@code endOffset}.
- * <p>
- * If {@code requestedStartOffset} is not null will seek up to that offset
- * Else If {@code requestedStartOffset} is null will seek to beginning see
- * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)}
- * <p>
- * When provided with {@code requestedEndOffset}, will return records up to consumer position == endOffset
- * Else If {@code requestedEndOffset} is null it will read up to the current end of the stream
- * {@link org.apache.kafka.clients.consumer.Consumer#seekToEnd(java.util.Collection)}
- * <p>
- * @param consumer functional kafka consumer.
- * @param topicPartition kafka topic partition.
- * @param requestedStartOffset requested start position.
- * @param requestedEndOffset requested end position. If null will read up to current last
- * @param pollTimeoutMs poll time out in ms.
+ * Kafka record Iterator pulling from a single {@code topicPartition} an inclusive {@code requestedStartOffset},
+ * up to exclusive {@code requestedEndOffset}.
+ *
+ * This iterator can block on polling up to a designated timeout.
+ *
+ * If no record is returned by brokers after poll timeout duration such case will be considered as an exception.
+ * Although the timeout exception it is a retryable exception, therefore users of this class can retry if needed.
+ *
+ * @param consumer Functional kafka consumer, user must initialize this and close it.
+ * @param topicPartition Target Kafka topic partition.
+ * @param requestedStartOffset Requested start offset position, if NULL iterator will seek to beginning using:
+ * {@link Consumer#seekToBeginning(java.util.Collection)}.
+ *
+ * @param requestedEndOffset Requested end position. If null will read up to last available offset,
+ * such position is given by:
+ * {@link Consumer#seekToEnd(java.util.Collection)}.
+ * @param pollTimeoutMs positive number indicating poll time out in ms.
*/
KafkaRecordIterator(Consumer<byte[], byte[]> consumer,
TopicPartition topicPartition,
@@ -87,6 +102,7 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null");
this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null");
this.pollTimeoutMs = pollTimeoutMs;
+ this.pollTimeoutDurationMs = Duration.ofMillis(pollTimeoutMs);
Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be positive number");
final List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
// assign topic partition to consumer
@@ -138,7 +154,12 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
}
/**
- * @throws IllegalStateException if the kafka consumer poll call can not reach the target offset.
+ * Check if there is more records to be consumed and pull more from the broker if current batch of record is empty.
+ * This method might block up to {@link this#pollTimeoutMs} to pull records from Kafka Broker.
+ *
+ * @throws PollTimeoutException if poll returns 0 record and consumer position did not reach requested endOffset.
+ * Such an exception is a retryable exception, and it can be a transient exception that if retried may succeed.
+ *
* @return true if has more records to be consumed.
*/
@Override public boolean hasNext() {
@@ -158,20 +179,20 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
/**
* Poll more records from the Kafka Broker.
*
- * @throws IllegalStateException if no records returned before consumer position reaches target end offset.
+ * @throws PollTimeoutException if poll returns 0 record and consumer's position < requested endOffset.
*/
private void pollRecords() {
if (LOG.isTraceEnabled()) {
stopwatch.reset().start();
}
- records = consumer.poll(pollTimeoutMs);
+ records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
stopwatch.stop();
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
- throw new IllegalStateException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
+ throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
pollTimeoutMs,
topicPartition.toString(),
startOffset,
@@ -201,4 +222,12 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
nextRecord = null;
}
}
+
+ static final class PollTimeoutException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ PollTimeoutException(String message) {
+ super(message);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
new file mode 100644
index 0000000..746de61
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+/**
+ * Kafka Records Reader implementation.
+ */
+@SuppressWarnings("WeakerAccess") public class KafkaRecordReader extends RecordReader<NullWritable, KafkaWritable>
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable, KafkaWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
+
+ private KafkaConsumer<byte[], byte[]> consumer = null;
+ private Configuration config = null;
+ private KafkaWritable currentWritableValue;
+ private Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor = null;
+
+ private long totalNumberRecords = 0L;
+ private long consumedRecords = 0L;
+ private long readBytes = 0L;
+ private volatile boolean started = false;
+ private long startOffset = -1L;
+ private long endOffset = Long.MAX_VALUE;
+
+ @SuppressWarnings("WeakerAccess") public KafkaRecordReader() {
+ }
+
+ private void initConsumer() {
+ if (consumer == null) {
+ LOG.info("Initializing Kafka Consumer");
+ final Properties properties = KafkaUtils.consumerProperties(config);
+ String brokerString = properties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ Preconditions.checkNotNull(brokerString, "broker end point can not be null");
+ LOG.info("Starting Consumer with Kafka broker string [{}]", brokerString);
+ consumer = new KafkaConsumer<>(properties);
+ }
+ }
+
+ @SuppressWarnings("WeakerAccess") public KafkaRecordReader(KafkaInputSplit inputSplit,
+ Configuration jobConf) {
+ initialize(inputSplit, jobConf);
+ }
+
+ private synchronized void initialize(KafkaInputSplit inputSplit, Configuration jobConf) {
+ if (!started) {
+ this.config = jobConf;
+ startOffset = inputSplit.getStartOffset();
+ endOffset = inputSplit.getEndOffset();
+ TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
+ Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
+ "Start [%s] has to be positive and less or equal than End [%s]",
+ startOffset,
+ endOffset);
+ totalNumberRecords += endOffset - startOffset;
+ initConsumer();
+ long
+ pollTimeout =
+ config.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1);
+ LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
+ this.recordsCursor =
+ startOffset == endOffset ?
+ new EmptyIterator() :
+ new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout);
+ started = true;
+ }
+ }
+
+ @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext context) {
+ initialize((KafkaInputSplit) inputSplit, context.getConfiguration());
+ }
+
+ @Override public boolean next(NullWritable nullWritable, KafkaWritable bytesWritable) {
+ if (started && recordsCursor.hasNext()) {
+ ConsumerRecord<byte[], byte[]> record = recordsCursor.next();
+ bytesWritable.set(record, startOffset, endOffset);
+ consumedRecords += 1;
+ readBytes += record.serializedValueSize();
+ return true;
+ }
+ return false;
+ }
+
+ @Override public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override public KafkaWritable createValue() {
+ return new KafkaWritable();
+ }
+
+ @Override public long getPos() {
+ return -1;
+ }
+
+ @Override public boolean nextKeyValue() {
+ currentWritableValue = new KafkaWritable();
+ if (next(NullWritable.get(), currentWritableValue)) {
+ return true;
+ }
+ currentWritableValue = null;
+ return false;
+ }
+
+ @Override public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
+
+ @Override public KafkaWritable getCurrentValue() {
+ return Preconditions.checkNotNull(currentWritableValue);
+ }
+
+ @Override public float getProgress() {
+ if (consumedRecords == 0) {
+ return 0f;
+ }
+ if (consumedRecords >= totalNumberRecords) {
+ return 1f;
+ }
+ return consumedRecords * 1.0f / totalNumberRecords;
+ }
+
+ @Override public void close() {
+ LOG.trace("total read bytes [{}]", readBytes);
+ if (consumer != null) {
+ consumer.wakeup();
+ consumer.close();
+ }
+ }
+
+ /**
+ * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition.
+ */
+ private static final class EmptyIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+ @Override public boolean hasNext() {
+ return false;
+ }
+
+ @Override public ConsumerRecord<byte[], byte[]> next() {
+ throw new IllegalStateException("this is an empty iterator");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
deleted file mode 100644
index 1b00f85..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import javax.annotation.Nullable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
-
-/**
- * Writable implementation of Kafka ConsumerRecord.
- * Serialized in the form
- * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) |
- * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) |
- * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])}
- */
-public class KafkaRecordWritable implements Writable {
-
- /**
- * Kafka partition id
- */
- private int partition;
- /**
- * Record Offset
- */
- private long offset;
- /**
- * Fist offset given by the input split used to pull the event {@link KafkaPullerInputSplit#getStartOffset()}
- */
- private long startOffset;
- /**
- * Last Offset given by the input split used to pull the event {@link KafkaPullerInputSplit#getEndOffset()}
- */
- private long endOffset;
- /**
- * Event timestamp provided by Kafka Record {@link ConsumerRecord#timestamp()}
- */
- private long timestamp;
- /**
- * Record value
- */
- private byte[] value;
-
- /**
- * Record key content or null
- */
- private byte[] recordKey;
-
-
- void set(ConsumerRecord<byte[], byte[]> consumerRecord, long startOffset, long endOffset) {
- this.partition = consumerRecord.partition();
- this.timestamp = consumerRecord.timestamp();
- this.offset = consumerRecord.offset();
- this.value = consumerRecord.value();
- this.recordKey = consumerRecord.key();
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
- KafkaRecordWritable(int partition,
- long offset,
- long timestamp,
- byte[] value,
- long startOffset,
- long endOffset,
- @Nullable byte[] recordKey) {
- this.partition = partition;
- this.offset = offset;
- this.timestamp = timestamp;
- this.value = value;
- this.recordKey = recordKey;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
- @SuppressWarnings("WeakerAccess") public KafkaRecordWritable() {
- }
-
- @Override public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(timestamp);
- dataOutput.writeInt(partition);
- dataOutput.writeLong(offset);
- dataOutput.writeLong(startOffset);
- dataOutput.writeLong(endOffset);
- dataOutput.writeInt(value.length);
- dataOutput.write(value);
- if (recordKey != null) {
- dataOutput.writeInt(recordKey.length);
- dataOutput.write(recordKey);
- } else {
- dataOutput.writeInt(-1);
- }
- }
-
- @Override public void readFields(DataInput dataInput) throws IOException {
- timestamp = dataInput.readLong();
- partition = dataInput.readInt();
- offset = dataInput.readLong();
- startOffset = dataInput.readLong();
- endOffset = dataInput.readLong();
- int dataSize = dataInput.readInt();
- if (dataSize > 0) {
- value = new byte[dataSize];
- dataInput.readFully(value);
- } else {
- value = new byte[0];
- }
- int keyArraySize = dataInput.readInt();
- if (keyArraySize > -1) {
- recordKey = new byte[keyArraySize];
- dataInput.readFully(recordKey);
- } else {
- recordKey = null;
- }
- }
-
- int getPartition() {
- return partition;
- }
-
- long getOffset() {
- return offset;
- }
-
- long getTimestamp() {
- return timestamp;
- }
-
- byte[] getValue() {
- return value;
- }
-
- long getStartOffset() {
- return startOffset;
- }
-
- long getEndOffset() {
- return endOffset;
- }
-
- @Nullable
- byte[] getRecordKey() {
- return recordKey;
- }
-
- @Override public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaRecordWritable)) {
- return false;
- }
- KafkaRecordWritable writable = (KafkaRecordWritable) o;
- return partition == writable.partition
- && offset == writable.offset
- && startOffset == writable.startOffset
- && endOffset == writable.endOffset
- && timestamp == writable.timestamp
- && Arrays.equals(value, writable.value)
- && Arrays.equals(recordKey, writable.recordKey);
- }
-
- @Override public int hashCode() {
- int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp);
- result = 31 * result + Arrays.hashCode(value);
- result = 31 * result + Arrays.hashCode(recordKey);
- return result;
- }
-
- @Override public String toString() {
- return "KafkaRecordWritable{"
- + "partition="
- + partition
- + ", offset="
- + offset
- + ", startOffset="
- + startOffset
- + ", endOffset="
- + endOffset
- + ", timestamp="
- + timestamp
- + ", value="
- + Arrays.toString(value)
- + ", recordKey="
- + Arrays.toString(recordKey)
- + '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
index 8fbdfda..256796a 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
@@ -63,14 +63,14 @@ import java.util.function.Predicate;
*/
class KafkaScanTrimmer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaScanTrimmer.class);
- private final Map<TopicPartition, KafkaPullerInputSplit> fullHouse;
+ private final Map<TopicPartition, KafkaInputSplit> fullHouse;
private final KafkaConsumer kafkaConsumer;
/**
* @param fullHouse initial full scan to be pruned, this is a map of Topic partition to input split.
* @param kafkaConsumer kafka consumer used to pull offsets for time filter if needed
*/
- KafkaScanTrimmer(Map<TopicPartition, KafkaPullerInputSplit> fullHouse, KafkaConsumer kafkaConsumer) {
+ KafkaScanTrimmer(Map<TopicPartition, KafkaInputSplit> fullHouse, KafkaConsumer kafkaConsumer) {
this.fullHouse = fullHouse;
this.kafkaConsumer = kafkaConsumer;
}
@@ -83,8 +83,8 @@ class KafkaScanTrimmer {
*
* @return tiny house of of the full house based on filter expression
*/
- Map<TopicPartition, KafkaPullerInputSplit> computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
- Map<TopicPartition, KafkaPullerInputSplit> optimizedScan = parseAndOptimize(filterExpression);
+ Map<TopicPartition, KafkaInputSplit> computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
+ Map<TopicPartition, KafkaInputSplit> optimizedScan = parseAndOptimize(filterExpression);
if (LOG.isDebugEnabled()) {
if (optimizedScan != null) {
@@ -113,7 +113,7 @@ class KafkaScanTrimmer {
*
* @return Map of optimized kafka range scans or null if it is impossible to optimize.
*/
- @Nullable private Map<TopicPartition, KafkaPullerInputSplit> parseAndOptimize(ExprNodeDesc expression) {
+ @Nullable private Map<TopicPartition, KafkaInputSplit> parseAndOptimize(ExprNodeDesc expression) {
if (expression.getClass() != ExprNodeGenericFuncDesc.class) {
return null;
}
@@ -154,7 +154,7 @@ class KafkaScanTrimmer {
*
* @return leaf scan or null if can not figure out push down
*/
- @Nullable private Map<TopicPartition, KafkaPullerInputSplit> pushLeaf(ExprNodeGenericFuncDesc expr,
+ @Nullable private Map<TopicPartition, KafkaInputSplit> pushLeaf(ExprNodeGenericFuncDesc expr,
PredicateLeaf.Operator operator,
boolean negation) {
if (expr.getChildren().size() != 2) {
@@ -192,8 +192,7 @@ class KafkaScanTrimmer {
constantDesc = (ExprNodeConstantDesc) extracted[0];
}
-
- if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.PARTITION.getName())) {
+ if (columnDesc.getColumn().equals(MetadataColumn.PARTITION.getName())) {
return buildScanFromPartitionPredicate(fullHouse,
operator,
((Number) constantDesc.getValue()).intValue(),
@@ -201,7 +200,7 @@ class KafkaScanTrimmer {
negation);
}
- if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.OFFSET.getName())) {
+ if (columnDesc.getColumn().equals(MetadataColumn.OFFSET.getName())) {
return buildScanFromOffsetPredicate(fullHouse,
operator,
((Number) constantDesc.getValue()).longValue(),
@@ -209,7 +208,7 @@ class KafkaScanTrimmer {
negation);
}
- if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.TIMESTAMP.getName())) {
+ if (columnDesc.getColumn().equals(MetadataColumn.TIMESTAMP.getName())) {
long timestamp = ((Number) constantDesc.getValue()).longValue();
//noinspection unchecked
return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer);
@@ -229,8 +228,8 @@ class KafkaScanTrimmer {
* @return filtered kafka scan
*/
- @VisibleForTesting static Map<TopicPartition, KafkaPullerInputSplit> buildScanFromPartitionPredicate(
- Map<TopicPartition, KafkaPullerInputSplit> fullScan,
+ @VisibleForTesting static Map<TopicPartition, KafkaInputSplit> buildScanFromPartitionPredicate(Map<TopicPartition,
+ KafkaInputSplit> fullScan,
PredicateLeaf.Operator operator,
int partitionConst,
boolean flip,
@@ -262,12 +261,12 @@ class KafkaScanTrimmer {
predicate = topicPartition -> true;
}
- ImmutableMap.Builder<TopicPartition, KafkaPullerInputSplit> builder = ImmutableMap.builder();
+ ImmutableMap.Builder<TopicPartition, KafkaInputSplit> builder = ImmutableMap.builder();
// Filter full scan based on predicate
fullScan.entrySet()
.stream()
.filter(entry -> predicate.test(entry.getKey()))
- .forEach(entry -> builder.put(entry.getKey(), entry.getValue().clone()));
+ .forEach(entry -> builder.put(entry.getKey(), KafkaInputSplit.copyOf(entry.getValue())));
return builder.build();
}
@@ -280,8 +279,8 @@ class KafkaScanTrimmer {
*
* @return optimized kafka scan
*/
- @VisibleForTesting static Map<TopicPartition, KafkaPullerInputSplit> buildScanFromOffsetPredicate(Map<TopicPartition,
- KafkaPullerInputSplit> fullScan,
+ @VisibleForTesting static Map<TopicPartition, KafkaInputSplit> buildScanFromOffsetPredicate(Map<TopicPartition,
+ KafkaInputSplit> fullScan,
PredicateLeaf.Operator operator,
long offsetConst,
boolean flip,
@@ -320,54 +319,50 @@ class KafkaScanTrimmer {
endOffset = -1;
}
- final Map<TopicPartition, KafkaPullerInputSplit> newScan = new HashMap<>();
+ final Map<TopicPartition, KafkaInputSplit> newScan = new HashMap<>();
fullScan.forEach((tp, existingInputSplit) -> {
- final KafkaPullerInputSplit newInputSplit;
+ final KafkaInputSplit newInputSplit;
if (startOffset != -1 && endOffset == -1) {
- newInputSplit = new KafkaPullerInputSplit(tp.topic(),
+ newInputSplit = new KafkaInputSplit(tp.topic(),
tp.partition(),
//if the user ask for start offset > max offset will replace with last offset
Math.min(startOffset, existingInputSplit.getEndOffset()),
existingInputSplit.getEndOffset(),
existingInputSplit.getPath());
} else if (endOffset != -1 && startOffset == -1) {
- newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
+ newInputSplit = new KafkaInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
//@TODO check this, if user ask for non existing end offset ignore it and position head on start
// This can be an issue when doing ingestion from kafka into Hive, what happen if there is some gaps
// Shall we fail the ingest or carry-on and ignore non existing offsets
Math.max(endOffset, existingInputSplit.getStartOffset()), existingInputSplit.getPath());
} else if (endOffset == startOffset + 1) {
if (startOffset < existingInputSplit.getStartOffset() || startOffset >= existingInputSplit.getEndOffset()) {
- newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(),
+ newInputSplit = new KafkaInputSplit(tp.topic(), tp.partition(),
// non existing offset will be seeking last offset
existingInputSplit.getEndOffset(), existingInputSplit.getEndOffset(), existingInputSplit.getPath());
} else {
newInputSplit =
- new KafkaPullerInputSplit(tp.topic(),
- tp.partition(),
- startOffset,
- endOffset,
- existingInputSplit.getPath());
+ new KafkaInputSplit(tp.topic(), tp.partition(), startOffset, endOffset, existingInputSplit.getPath());
}
} else {
newInputSplit =
- new KafkaPullerInputSplit(tp.topic(),
+ new KafkaInputSplit(tp.topic(),
tp.partition(),
existingInputSplit.getStartOffset(),
existingInputSplit.getEndOffset(),
existingInputSplit.getPath());
}
- newScan.put(tp, KafkaPullerInputSplit.intersectRange(newInputSplit, existingInputSplit));
+ newScan.put(tp, KafkaInputSplit.intersectRange(newInputSplit, existingInputSplit));
});
return newScan;
}
- @Nullable private static Map<TopicPartition, KafkaPullerInputSplit> buildScanForTimesPredicate(
- Map<TopicPartition, KafkaPullerInputSplit> fullHouse,
+ @Nullable private static Map<TopicPartition, KafkaInputSplit> buildScanForTimesPredicate(
+ Map<TopicPartition, KafkaInputSplit> fullHouse,
PredicateLeaf.Operator operator,
long timestamp,
boolean flip,
@@ -385,11 +380,11 @@ class KafkaScanTrimmer {
// NULL will be returned for that partition If the message format version in a partition is before 0.10.0
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(timePartitionsMap);
return Maps.toMap(fullHouse.keySet(), tp -> {
- KafkaPullerInputSplit existing = fullHouse.get(tp);
+ KafkaInputSplit existing = fullHouse.get(tp);
OffsetAndTimestamp foundOffsetAndTime = offsetAndTimestamp.get(tp);
//Null in case filter doesn't match or field not existing ie old broker thus return empty scan.
final long startOffset = foundOffsetAndTime == null ? existing.getEndOffset() : foundOffsetAndTime.offset();
- return new KafkaPullerInputSplit(Objects.requireNonNull(tp).topic(),
+ return new KafkaInputSplit(Objects.requireNonNull(tp).topic(),
tp.partition(),
startOffset,
existing.getEndOffset(),
@@ -410,21 +405,21 @@ class KafkaScanTrimmer {
*
* @return either full scan or an optimized sub scan.
*/
- private Map<TopicPartition, KafkaPullerInputSplit> pushAndOp(ExprNodeGenericFuncDesc expr) {
- Map<TopicPartition, KafkaPullerInputSplit> currentScan = new HashMap<>();
+ private Map<TopicPartition, KafkaInputSplit> pushAndOp(ExprNodeGenericFuncDesc expr) {
+ Map<TopicPartition, KafkaInputSplit> currentScan = new HashMap<>();
- fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaPullerInputSplit.copyOf(input)));
+ fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaInputSplit.copyOf(input)));
for (ExprNodeDesc child : expr.getChildren()) {
- Map<TopicPartition, KafkaPullerInputSplit> scan = parseAndOptimize(child);
+ Map<TopicPartition, KafkaInputSplit> scan = parseAndOptimize(child);
if (scan != null) {
Set<TopicPartition> currentKeys = ImmutableSet.copyOf(currentScan.keySet());
currentKeys.forEach(key -> {
- KafkaPullerInputSplit newSplit = scan.get(key);
- KafkaPullerInputSplit oldSplit = currentScan.get(key);
+ KafkaInputSplit newSplit = scan.get(key);
+ KafkaInputSplit oldSplit = currentScan.get(key);
currentScan.remove(key);
if (newSplit != null) {
- KafkaPullerInputSplit intersectionSplit = KafkaPullerInputSplit.intersectRange(newSplit, oldSplit);
+ KafkaInputSplit intersectionSplit = KafkaInputSplit.intersectRange(newSplit, oldSplit);
if (intersectionSplit != null) {
currentScan.put(key, intersectionSplit);
}
@@ -436,18 +431,18 @@ class KafkaScanTrimmer {
return currentScan;
}
- @Nullable private Map<TopicPartition, KafkaPullerInputSplit> pushOrOp(ExprNodeGenericFuncDesc expr) {
- final Map<TopicPartition, KafkaPullerInputSplit> currentScan = new HashMap<>();
+ @Nullable private Map<TopicPartition, KafkaInputSplit> pushOrOp(ExprNodeGenericFuncDesc expr) {
+ final Map<TopicPartition, KafkaInputSplit> currentScan = new HashMap<>();
for (ExprNodeDesc child : expr.getChildren()) {
- Map<TopicPartition, KafkaPullerInputSplit> scan = parseAndOptimize(child);
+ Map<TopicPartition, KafkaInputSplit> scan = parseAndOptimize(child);
if (scan == null) {
// if any of the children is unknown bailout
return null;
}
scan.forEach((tp, input) -> {
- KafkaPullerInputSplit existingSplit = currentScan.get(tp);
- currentScan.put(tp, KafkaPullerInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
+ KafkaInputSplit existingSplit = currentScan.get(tp);
+ currentScan.put(tp, KafkaInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
});
}
return currentScan;
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
new file mode 100644
index 0000000..51cfa24
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.rmi.server.UID;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Generic Kafka Serde that allow user to delegate Serde to other class like Avro,
+ * Json or any class that supports {@link BytesWritable}.
+ * I the user which to implement their own serde all they need is to implement a serde that extend
+ * {@link org.apache.hadoop.hive.serde2.AbstractSerDe} and accept {@link BytesWritable} as value
+ */
+@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES }) public class KafkaSerDe
+ extends AbstractSerDe {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSerDe.class);
+
+ /**
+ * Delegate SerDe used to Serialize and DeSerialize data form/to Kafka.
+ */
+ private AbstractSerDe delegateSerDe;
+
+ /**
+ * Delegate Object Inspector used to Deserialize the row, this OI is constructed by the {@code delegateSerDe}.
+ */
+ private StructObjectInspector delegateDeserializerOI;
+
+ /**
+ * Delegate Object Inspector used to Serialize the row as byte array.
+ */
+ private StructObjectInspector delegateSerializerOI;
+
+ /**
+ * Object Inspector of original row plus metadata.
+ */
+ private ObjectInspector objectInspector;
+ private final List<String> columnNames = Lists.newArrayList();
+ private BytesConverter bytesConverter;
+
+ @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException {
+ //This method is called before {@link org.apache.hadoop.hive.kafka.KafkaStorageHandler.preCreateTable}
+ //Thus we need to default to org.apache.hadoop.hive.kafka.KafkaUtils.DEFAULT_PROPERTIES if any property is needed
+ final String
+ className =
+ tbl.getProperty(KafkaTableProperties.SERDE_CLASS_NAME.getName(),
+ KafkaTableProperties.SERDE_CLASS_NAME.getDefaultValue());
+ delegateSerDe = KafkaUtils.createDelegate(className);
+ //noinspection deprecation
+ delegateSerDe.initialize(conf, tbl);
+
+ if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) {
+ throw new SerDeException("Was expecting Struct Object Inspector but have " + delegateSerDe.getObjectInspector()
+ .getClass()
+ .getName());
+ }
+ delegateDeserializerOI = (StructObjectInspector) delegateSerDe.getObjectInspector();
+
+ // Build column names Order matters here
+ columnNames.addAll(delegateDeserializerOI.getAllStructFieldRefs()
+ .stream()
+ .map(StructField::getFieldName)
+ .collect(Collectors.toList()));
+ columnNames.addAll(MetadataColumn.KAFKA_METADATA_COLUMN_NAMES);
+
+ final List<ObjectInspector> inspectors = new ArrayList<>(columnNames.size());
+ inspectors.addAll(delegateDeserializerOI.getAllStructFieldRefs()
+ .stream()
+ .map(StructField::getFieldObjectInspector)
+ .collect(Collectors.toList()));
+ inspectors.addAll(MetadataColumn.KAFKA_METADATA_INSPECTORS);
+ objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+
+ // Setup Read and Write Path From/To Kafka
+ if (delegateSerDe.getSerializedClass() == Text.class) {
+ bytesConverter = new TextBytesConverter();
+ } else if (delegateSerDe.getSerializedClass() == AvroGenericRecordWritable.class) {
+ String schemaFromProperty = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), "");
+ Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
+ Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
+ LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
+ bytesConverter = new AvroBytesConverter(schema);
+ } else {
+ bytesConverter = new BytesWritableConverter();
+ }
+ }
+
+ @Override public Class<? extends Writable> getSerializedClass() {
+ return delegateSerDe.getSerializedClass();
+ }
+
+ @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+
+ if (!(objInspector instanceof StructObjectInspector)) {
+ throw new SerDeException("Object inspector has to be "
+ + StructObjectInspector.class.getName()
+ + " but got "
+ + objInspector.getClass().getName());
+ }
+ StructObjectInspector structObjectInspector = (StructObjectInspector) objInspector;
+ List<Object> data = structObjectInspector.getStructFieldsDataAsList(obj);
+ if (delegateSerializerOI == null) {
+ //@TODO check if i can cache this if it is the same.
+ delegateSerializerOI =
+ new SubStructObjectInspector(structObjectInspector, data.size() - MetadataColumn.values().length);
+ }
+ // We always append the metadata columns to the end of the row.
+ final List<Object> row = data.subList(0, data.size() - MetadataColumn.values().length);
+ //@TODO @FIXME use column names instead of actual positions that can be hard to read and review
+ Object key = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size());
+ Object partition = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 1);
+ Object offset = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 2);
+ Object timestamp = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 3);
+
+ if (PrimitiveObjectInspectorUtils.getLong(offset, MetadataColumn.OFFSET.getObjectInspector()) != -1) {
+ LOG.error("Can not insert values into `__offset` column, has to be [-1]");
+ throw new SerDeException("Can not insert values into `__offset` column, has to be [-1]");
+ }
+
+ final byte[]
+ keyBytes =
+ key == null ? null : PrimitiveObjectInspectorFactory.writableBinaryObjectInspector.getPrimitiveJavaObject(key);
+ final long
+ recordTs =
+ timestamp == null ?
+ -1 :
+ PrimitiveObjectInspectorUtils.getLong(timestamp, MetadataColumn.TIMESTAMP.getObjectInspector());
+ final int
+ recordPartition =
+ partition == null ?
+ -1 :
+ PrimitiveObjectInspectorUtils.getInt(partition, MetadataColumn.PARTITION.getObjectInspector());
+
+ //noinspection unchecked
+ return new KafkaWritable(recordPartition,
+ recordTs,
+ bytesConverter.getBytes(delegateSerDe.serialize(row, delegateSerializerOI)),
+ keyBytes);
+ }
+
+ @Override public SerDeStats getSerDeStats() {
+ return delegateSerDe.getSerDeStats();
+ }
+
+ @Override public Object deserialize(Writable blob) throws SerDeException {
+ KafkaWritable record = (KafkaWritable) blob;
+ final Object row = delegateSerDe.deserialize(bytesConverter.getWritable(record.getValue()));
+ return columnNames.stream().map(name -> {
+ final MetadataColumn metadataColumn = MetadataColumn.forName(name);
+ if (metadataColumn != null) {
+ return record.getHiveWritable(metadataColumn);
+ }
+ return delegateDeserializerOI.getStructFieldData(row, delegateDeserializerOI.getStructFieldRef(name));
+ }).collect(Collectors.toList());
+ }
+
+ @Override public ObjectInspector getObjectInspector() {
+ return objectInspector;
+ }
+
+ /**
+ * Returns a view of input object inspector list between:
+ * <tt>0</tt> inclusive and the specified <tt>toIndex</tt>, exclusive.
+ */
+ private static final class SubStructObjectInspector extends StructObjectInspector {
+
+ private final StructObjectInspector baseOI;
+ private final List<? extends StructField> structFields;
+
+ /**
+ * Returns a live view of the base Object inspector starting form 0 to {@code toIndex} exclusive.
+ * @param baseOI base Object Inspector.
+ * @param toIndex toIndex.
+ */
+ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
+ this.baseOI = baseOI;
+ structFields = baseOI.getAllStructFieldRefs().subList(0, toIndex);
+ }
+
+ /**
+ * Returns all the fields.
+ */
+ @Override public List<? extends StructField> getAllStructFieldRefs() {
+ return structFields;
+ }
+
+ /**
+ * Look up a field.
+ * @param fieldName fieldName to be looked up.
+ */
+ @SuppressWarnings("OptionalGetWithoutIsPresent") @Override public StructField getStructFieldRef(String fieldName) {
+ return this.getAllStructFieldRefs()
+ .stream()
+ .filter(ref -> ref.getFieldName().equals(fieldName))
+ .findFirst()
+ .get();
+ }
+
+ /**
+ * returns null for data = null.
+ * @param data input.
+ * @param fieldRef field to extract.
+ */
+ @Override public Object getStructFieldData(Object data, StructField fieldRef) {
+ return baseOI.getStructFieldData(data, fieldRef);
+ }
+
+ /**
+ * returns null for data = null.
+ * @param data input data.
+ */
+ @Override public List<Object> getStructFieldsDataAsList(Object data) {
+ if (data == null) {
+ return null;
+ }
+ int size = getAllStructFieldRefs().size();
+ List<Object> res = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ res.add(baseOI.getStructFieldData(data, getAllStructFieldRefs().get(i)));
+ }
+ return res;
+ }
+
+ /**
+ * Returns the name of the data type that is inspected by this
+ * ObjectInspector. This is used to display the type information to the user.
+ *
+ * For primitive types, the type name is standardized. For other types, the
+ * type name can be something like "list<int>", "map<int,string>", java class
+ * names, or user-defined type names similar to typedef.
+ */
+ @Override public String getTypeName() {
+ return baseOI.getTypeName();
+ }
+
+ /**
+ * An ObjectInspector must inherit from one of the following interfaces if
+ * getCategory() returns: PRIMITIVE: PrimitiveObjectInspector LIST:
+ * ListObjectInspector MAP: MapObjectInspector STRUCT: StructObjectInspector.
+ */
+ @Override public Category getCategory() {
+ return baseOI.getCategory();
+ }
+ }
+
+ /**
+ * Class that encapsulate the logic of serialize and deserialize bytes array to/from the delegate writable format.
+ * @param <K> delegate writable class.
+ */
+ private interface BytesConverter<K extends Writable> {
+ byte[] getBytes(K writable);
+
+ K getWritable(byte[] value);
+ }
+
+ private static class AvroBytesConverter implements BytesConverter<AvroGenericRecordWritable> {
+ private final Schema schema;
+ private final DatumReader<GenericRecord> dataReader;
+ private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<>();
+ private final AvroGenericRecordWritable avroGenericRecordWritable = new AvroGenericRecordWritable();
+ private final UID uid = new UID();
+
+ AvroBytesConverter(Schema schema) {
+ this.schema = schema;
+ dataReader = new SpecificDatumReader<>(this.schema);
+ }
+
+ @Override public byte[] getBytes(AvroGenericRecordWritable writable) {
+ GenericRecord record = writable.getRecord();
+ byte[] valueBytes = null;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(out, null);
+ gdw.setSchema(record.getSchema());
+ gdw.write(record, be);
+ out.flush();
+ valueBytes = out.toByteArray();
+ } catch (IOException e) {
+ Throwables.propagate(new SerDeException(e));
+ }
+ return valueBytes;
+ }
+
+ @Override public AvroGenericRecordWritable getWritable(byte[] value) {
+ GenericRecord avroRecord = null;
+ try {
+ avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+ } catch (IOException e) {
+ Throwables.propagate(new SerDeException(e));
+ }
+
+ avroGenericRecordWritable.setRecord(avroRecord);
+ avroGenericRecordWritable.setRecordReaderID(uid);
+ avroGenericRecordWritable.setFileSchema(avroRecord.getSchema());
+ return avroGenericRecordWritable;
+ }
+ }
+
+ private static class BytesWritableConverter implements BytesConverter<BytesWritable> {
+ @Override public byte[] getBytes(BytesWritable writable) {
+ return writable.getBytes();
+ }
+
+ @Override public BytesWritable getWritable(byte[] value) {
+ return new BytesWritable(value);
+ }
+ }
+
+ private static class TextBytesConverter implements BytesConverter<Text> {
+ Text text = new Text();
+ @Override public byte[] getBytes(Text writable) {
+ //@TODO There is no reason to decode then encode the string to bytes really
+ //@FIXME this issue with CTRL-CHAR ^0 added by Text at the end of string and Json serd does not like that.
+ try {
+ return writable.decode(writable.getBytes(), 0, writable.getLength()).getBytes(Charset.forName("UTF-8"));
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override public Text getWritable(byte[] value) {
+ text.set(value);
+ return text;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index 96222c9..0d64cd9 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -18,10 +18,21 @@
package org.apache.hadoop.hive.kafka;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -31,22 +42,29 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
/**
- * Hive Kafka storage handler to allow user querying Stream of tuples from a Kafka queue.
+ * Hive Kafka storage handler to allow user to read and write from/to Kafka message bus.
*/
-public class KafkaStorageHandler implements HiveStorageHandler {
+@SuppressWarnings("ALL") public class KafkaStorageHandler extends DefaultHiveMetaHook implements HiveStorageHandler {
private static final Logger LOG = LoggerFactory.getLogger(KafkaStorageHandler.class);
private static final String KAFKA_STORAGE_HANDLER = "org.apache.hadoop.hive.kafka.KafkaStorageHandler";
@@ -54,19 +72,19 @@ public class KafkaStorageHandler implements HiveStorageHandler {
private Configuration configuration;
@Override public Class<? extends InputFormat> getInputFormatClass() {
- return KafkaPullerInputFormat.class;
+ return KafkaInputFormat.class;
}
@Override public Class<? extends OutputFormat> getOutputFormatClass() {
- return NullOutputFormat.class;
+ return KafkaOutputFormat.class;
}
@Override public Class<? extends AbstractSerDe> getSerDeClass() {
- return GenericKafkaSerDe.class;
+ return KafkaSerDe.class;
}
@Override public HiveMetaHook getMetaHook() {
- return null;
+ return this;
}
@Override public HiveAuthorizationProvider getAuthorizationProvider() {
@@ -74,43 +92,33 @@ public class KafkaStorageHandler implements HiveStorageHandler {
}
@Override public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- String topic = tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, "");
+ configureCommonProperties(tableDesc, jobProperties);
+ }
+
+ private void configureCommonProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+ String topic = tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName(), "");
if (topic.isEmpty()) {
throw new IllegalArgumentException("Kafka topic missing set table property->"
- + KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
+ + KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
}
- jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, topic);
- String brokerString = tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, "");
+ jobProperties.put(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName(), topic);
+ String
+ brokerString =
+ tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(), "");
if (brokerString.isEmpty()) {
throw new IllegalArgumentException("Broker address missing set table property->"
- + KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+ + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ }
+ jobProperties.put(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(), brokerString);
+ Arrays.stream(KafkaTableProperties.values())
+ .filter(tableProperty -> !tableProperty.isMandatory())
+ .forEach(tableProperty -> jobProperties.put(tableProperty.getName(),
+ tableDesc.getProperties().getProperty(tableProperty.getName())));
+ // If the user ask for EOS then set the read to only committed.
+ if (jobProperties.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName())
+ .equals(KafkaOutputFormat.WriteSemantic.EXACTLY_ONCE.name())) {
+ jobProperties.put("kafka.consumer.isolation.level", "read_committed");
}
- jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, brokerString);
-
- jobProperties.put(KafkaStreamingUtils.SERDE_CLASS_NAME,
- tableDesc.getProperties().getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName()));
- LOG.debug("Table properties: SerDe class name {}", jobProperties.get(KafkaStreamingUtils.SERDE_CLASS_NAME));
-
- //set extra properties
- tableDesc.getProperties()
- .entrySet()
- .stream()
- .filter(objectObjectEntry -> objectObjectEntry.getKey()
- .toString()
- .toLowerCase()
- .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX))
- .forEach(entry -> {
- String
- key =
- entry.getKey().toString().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1);
- if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) {
- throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
- }
- String value = entry.getValue().toString();
- jobProperties.put(key, value);
- LOG.info("Setting extra job properties: key [{}] -> value [{}]", key, value);
-
- });
}
@Override public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> secrets) {
@@ -118,19 +126,21 @@ public class KafkaStorageHandler implements HiveStorageHandler {
}
@Override public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-
+ configureCommonProperties(tableDesc, jobProperties);
}
@Override public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
configureInputJobProperties(tableDesc, jobProperties);
+ configureOutputJobProperties(tableDesc, jobProperties);
}
@Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
Map<String, String> properties = new HashMap<>();
configureInputJobProperties(tableDesc, properties);
+ configureOutputJobProperties(tableDesc, properties);
properties.forEach(jobConf::set);
try {
- KafkaStreamingUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
+ KafkaUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -149,11 +159,11 @@ public class KafkaStorageHandler implements HiveStorageHandler {
}
@Override public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
- String topic = table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
+ String topic = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
if (topic == null || topic.isEmpty()) {
throw new MetaException("topic is null or empty");
}
- String brokers = table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+ String brokers = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
if (brokers == null || brokers.isEmpty()) {
throw new MetaException("kafka brokers string is null or empty");
}
@@ -161,19 +171,232 @@ public class KafkaStorageHandler implements HiveStorageHandler {
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, Utilities.getTaskId(getConf()));
table.getParameters()
.entrySet()
.stream()
.filter(objectObjectEntry -> objectObjectEntry.getKey()
.toLowerCase()
- .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX))
+ .startsWith(KafkaUtils.CONSUMER_CONFIGURATION_PREFIX))
.forEach(entry -> {
- String key = entry.getKey().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1);
- if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) {
+ String key = entry.getKey().substring(KafkaUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1);
+ if (KafkaUtils.FORBIDDEN_PROPERTIES.contains(key)) {
throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
}
properties.put(key, entry.getValue());
});
return new KafkaStorageHandlerInfo(topic, properties);
}
+
+ private Properties buildProducerProperties(Table table) {
+ String brokers = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ if (brokers == null || brokers.isEmpty()) {
+ throw new RuntimeException("kafka brokers string is null or empty");
+ }
+ final Properties properties = new Properties();
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ table.getParameters()
+ .entrySet()
+ .stream()
+ .filter(objectObjectEntry -> objectObjectEntry.getKey()
+ .toLowerCase()
+ .startsWith(KafkaUtils.PRODUCER_CONFIGURATION_PREFIX))
+ .forEach(entry -> {
+ String key = entry.getKey().substring(KafkaUtils.PRODUCER_CONFIGURATION_PREFIX.length() + 1);
+ if (KafkaUtils.FORBIDDEN_PROPERTIES.contains(key)) {
+ throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
+ }
+ properties.put(key, entry.getValue());
+ });
+ return properties;
+ }
+
+ @Override public LockType getLockType(WriteEntity writeEntity) {
+ if (writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT)) {
+ return LockType.SHARED_READ;
+ }
+ return LockType.SHARED_WRITE;
+ }
+
+ private String getQueryId() {
+ return HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID);
+ }
+
+ @Override public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
+ boolean
+ isExactlyOnce =
+ table.getParameters()
+ .get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName())
+ .equals(KafkaOutputFormat.WriteSemantic.EXACTLY_ONCE.name());
+ String optimiticCommitVal = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName());
+ boolean isTwoPhaseCommit = !Boolean.parseBoolean(optimiticCommitVal);
+ if (!isExactlyOnce || !isTwoPhaseCommit) {
+ //Case it is not 2 phase commit no open transaction to handel.
+ return;
+ }
+
+ final Path queryWorkingDir = getQueryWorkingDir(table);
+ final Map<String, Pair<Long, Short>> transactionsMap;
+ final int maxTries = Integer.parseInt(table.getParameters().get(KafkaTableProperties.MAX_RETRIES.getName()));
+ // We have 4 Stages ahead of us:
+ // 1 Fetch Transactions state from HDFS.
+ // 2 Build/inti all the Kafka producers and perform a pre commit call to check if we can go ahead with commit.
+ // 3 Commit Transactions one by one.
+ // 4 Clean workingDirectory.
+
+ //First stage fetch the Transactions states
+ final RetryUtils.Task<Map<String, Pair<Long, Short>>>
+ fetchTransactionStates =
+ new RetryUtils.Task<Map<String, Pair<Long, Short>>>() {
+ @Override public Map<String, Pair<Long, Short>> perform() throws Exception {
+ return TransactionalKafkaWriter.getTransactionsState(FileSystem.get(getConf()), queryWorkingDir);
+ }
+ };
+
+ try {
+ transactionsMap = RetryUtils.retry(fetchTransactionStates, (error) -> (error instanceof IOException), maxTries);
+ } catch (Exception e) {
+ // Can not go further
+ LOG.error("Can not fetch Transaction states due [{}]", e.getMessage());
+ throw new MetaException(e.getMessage());
+ }
+
+ //Second Stage Resume Producers and Pre commit
+ final Properties baseProducerPros = buildProducerProperties(table);
+ final Map<String, HiveKafkaProducer> producersMap = new HashMap<>();
+ final RetryUtils.Task<Void> buildProducersTask = new RetryUtils.Task<Void>() {
+ @Override public Void perform() throws Exception {
+ assert producersMap.size() == 0;
+ transactionsMap.forEach((key, value) -> {
+ // Base Producer propeties, missing the transaction Id.
+ baseProducerPros.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, key);
+ HiveKafkaProducer<byte[], byte[]> producer = new HiveKafkaProducer<>(baseProducerPros);
+ producer.resumeTransaction(value.getLeft(), value.getRight());
+ // This is a dummy RPC call to ensure that the producer still resumable and signal the Pre-commit as per :
+ // https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka#EndPhase
+ producer.sendOffsetsToTransaction(ImmutableMap.of(), "__dry_run");
+ producersMap.put(key, producer);
+ });
+ return null;
+ }
+ };
+
+ RetryUtils.CleanupAfterFailure cleanUpTheMap = new RetryUtils.CleanupAfterFailure() {
+ @Override public void cleanup() {
+ producersMap.forEach((s, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+ producersMap.clear();
+ }
+ };
+ final Predicate<Throwable>
+ isRetrayable = (error) -> !KafkaUtils.exceptionIsFatal(error) && !(error instanceof ProducerFencedException);
+ try {
+ RetryUtils.retry(buildProducersTask, isRetrayable, cleanUpTheMap, maxTries, "Error while Builing Producers");
+ } catch (Exception e) {
+ // Can not go further
+ LOG.error("Can not fetch build produces due [{}]", e.getMessage());
+ throw new MetaException(e.getMessage());
+ }
+
+ //Third Stage Commit Transactions, this part is the actual critical section.
+ //The commit might be retried on error, but keep in mind in some cases, like open transaction can expire
+ //after timeout duration of 15 mins it is not possible to go further.
+ final Set<String> committedTx = new HashSet<>();
+ final RetryUtils.Task<Void> commitTask = new RetryUtils.Task() {
+ @Override public Object perform() throws Exception {
+ producersMap.forEach((key, producer) -> {
+ if (!committedTx.contains(key)) {
+ producer.commitTransaction();
+ committedTx.add(key);
+ producer.close();
+ LOG.info("Committed Transaction [{}]", key);
+ }
+ });
+ return null;
+ }
+ };
+
+ try {
+ RetryUtils.retry(commitTask, isRetrayable, maxTries);
+ } catch (Exception e) {
+ // at this point we are in a funky state if one commit happend!! close and log it
+ producersMap.forEach((key, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+ LOG.error("Commit transaction failed", e);
+ if (committedTx.size() > 0) {
+ LOG.error("Partial Data Got Commited Some actions need to be Done");
+ committedTx.stream().forEach(key -> LOG.error("Transaction [{}] is an orphen commit", key));
+ }
+ throw new MetaException(e.getMessage());
+ }
+
+ //Stage four, clean the Query Directory
+ final RetryUtils.Task<Void> cleanQueryDirTask = new RetryUtils.Task<Void>() {
+ @Override public Void perform() throws Exception {
+ cleanWorkingDirectory(queryWorkingDir);
+ return null;
+ }
+ };
+ try {
+ RetryUtils.retry(cleanQueryDirTask, (error) -> error instanceof IOException, maxTries);
+ } catch (Exception e) {
+ //just log it
+ LOG.error("Faild to clean Query Working Directory [{}] due to [{}]", queryWorkingDir, e.getMessage());
+ }
+ }
+
+ @Override public void preInsertTable(Table table, boolean overwrite) throws MetaException {
+ if (overwrite) {
+ throw new MetaException("Kafa Table does not support the overwite SQL Smentic");
+ }
+ }
+
+ @Override public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException {
+
+ }
+
+ @Override public void preCreateTable(Table table) throws MetaException {
+ if (!table.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
+ throw new MetaException(KAFKA_STORAGE_HANDLER + " supports only " + TableType.EXTERNAL_TABLE);
+ }
+ Arrays.stream(KafkaTableProperties.values())
+ .filter(KafkaTableProperties::isMandatory)
+ .forEach(key -> Preconditions.checkNotNull(table.getParameters().get(key.getName()),
+ "Set Table property " + key.getName()));
+ // Put all the default at the pre create.
+ Arrays.stream(KafkaTableProperties.values()).forEach((key) -> {
+ if (table.getParameters().get(key.getName()) == null) {
+ table.putToParameters(key.getName(), key.getDefaultValue());
+ }
+ });
+ }
+
+ @Override public void rollbackCreateTable(Table table) throws MetaException {
+
+ }
+
+ @Override public void commitCreateTable(Table table) throws MetaException {
+ commitInsertTable(table, false);
+ }
+
+ @Override public void preDropTable(Table table) throws MetaException {
+
+ }
+
+ @Override public void rollbackDropTable(Table table) throws MetaException {
+
+ }
+
+ @Override public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+
+ }
+
+ private Path getQueryWorkingDir(Table table) {
+ return new Path(table.getSd().getLocation(), getQueryId());
+ }
+
+ private void cleanWorkingDirectory(Path queryWorkingDir) throws IOException {
+ FileSystem fs = FileSystem.get(getConf());
+ fs.delete(queryWorkingDir, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
deleted file mode 100644
index 4802c4e..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.ReflectionUtil;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Constant, Table properties, Utilities class.
- */
-final class KafkaStreamingUtils {
-
- /**
- * MANDATORY Table property indicating kafka topic backing the table.
- */
- static final String HIVE_KAFKA_TOPIC = "kafka.topic";
- /**
- * MANDATORY Table property indicating kafka broker(s) connection string.
- */
- static final String HIVE_KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
- /**
- * Table property indicating which delegate serde to be used, NOT MANDATORY defaults to {@link KafkaJsonSerDe}
- */
- static final String SERDE_CLASS_NAME = "kafka.serde.class";
- /**
- * Table property indicating poll/fetch timeout period in millis.
- * FYI this is independent from internal Kafka consumer timeouts, defaults to {@DEFAULT_CONSUMER_POLL_TIMEOUT_MS}.
- */
- static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms";
- /**
- * Default poll timeout for fetching metadata and record batch.
- */
- static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000L; // 5 seconds
- /**
- * Table property prefix used to inject kafka consumer properties, e.g "kafka.consumer.max.poll.records" = "5000"
- * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT MANDATORY defaults to nothing
- */
- static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
- /**
- * Set of Kafka properties that the user can not set via DDLs.
- */
- static final HashSet<String> FORBIDDEN_PROPERTIES =
- new HashSet<>(ImmutableList.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
-
- private KafkaStreamingUtils() {
- }
-
- /**
- * @param configuration Job configs
- *
- * @return default consumer properties
- */
- static Properties consumerProperties(Configuration configuration) {
- final Properties props = new Properties();
- // we are managing the commit offset
- props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- // we are seeking in the stream so no reset
- props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
- String brokerEndPoint = configuration.get(HIVE_KAFKA_BOOTSTRAP_SERVERS);
- if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
- throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config "
- + HIVE_KAFKA_BOOTSTRAP_SERVERS);
- }
- props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- // user can always override stuff
- final Map<String, String>
- kafkaProperties =
- configuration.getValByRegex("^" + CONSUMER_CONFIGURATION_PREFIX + "\\..*");
- for (Map.Entry<String, String> entry : kafkaProperties.entrySet()) {
- String key = entry.getKey().substring(CONSUMER_CONFIGURATION_PREFIX.length() + 1);
- if (FORBIDDEN_PROPERTIES.contains(key)) {
- throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
- }
- props.setProperty(key, entry.getValue());
- }
- return props;
- }
-
- static void copyDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
- Set<String> jars = new HashSet<>();
- FileSystem localFs = FileSystem.getLocal(conf);
- jars.addAll(conf.getStringCollection("tmpjars"));
- jars.addAll(Arrays.stream(classes).filter(Objects::nonNull).map(clazz -> {
- String path = Utilities.jarFinderGetJar(clazz);
- if (path == null) {
- throw new RuntimeException("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
- }
- try {
- if (!localFs.exists(new Path(path))) {
- throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return path;
- }).collect(Collectors.toList()));
-
- if (jars.isEmpty()) {
- return;
- }
- conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
- }
-
- static AbstractSerDe createDelegate(String className) {
- final Class<? extends AbstractSerDe> clazz;
- try {
- //noinspection unchecked
- clazz = (Class<? extends AbstractSerDe>) Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- // we are not setting conf thus null is okay
- return ReflectionUtil.newInstance(clazz, null);
- }
-
- /**
- * Basic Enum class for all the metadata columns appended to the Kafka row by the deserializer.
- */
- enum MetadataColumn {
- /**
- * Record offset column name added as extra metadata column to row as long.
- */
- OFFSET("__offset", TypeInfoFactory.longTypeInfo),
- /**
- * Record Kafka Partition column name added as extra meta column of type int.
- */
- PARTITION("__partition", TypeInfoFactory.intTypeInfo),
- /**
- * Record Kafka key column name added as extra meta column of type binary blob.
- */
- KEY("__key", TypeInfoFactory.binaryTypeInfo),
- /**
- * Record Timestamp column name, added as extra meta column of type long.
- */
- TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo),
- /**
- * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner.
- */
- START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo),
- /**
- * End offset given by input split at run time.
- */
- END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo);
-
- private final String name;
- private final TypeInfo typeInfo;
-
- MetadataColumn(String name, TypeInfo typeInfo) {
- this.name = name;
- this.typeInfo = typeInfo;
- }
-
- public String getName() {
- return name;
- }
-
- public AbstractPrimitiveWritableObjectInspector getObjectInspector() {
- return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
- typeInfo.getTypeName()));
- }
- }
-
- //Order at which column and types will be appended to the original row.
- /**
- * Kafka metadata columns order list.
- */
- private static final List<MetadataColumn> KAFKA_METADATA_COLUMNS =
- Arrays.asList(MetadataColumn.KEY,
- MetadataColumn.PARTITION,
- MetadataColumn.OFFSET,
- MetadataColumn.TIMESTAMP,
- MetadataColumn.START_OFFSET,
- MetadataColumn.END_OFFSET);
-
- /**
- * Kafka metadata column names.
- */
- static final List<String> KAFKA_METADATA_COLUMN_NAMES = KAFKA_METADATA_COLUMNS
- .stream()
- .map(MetadataColumn::getName)
- .collect(Collectors.toList());
-
- /**
- * Kafka metadata column inspectors.
- */
- static final List<ObjectInspector> KAFKA_METADATA_INSPECTORS = KAFKA_METADATA_COLUMNS
- .stream()
- .map(MetadataColumn::getObjectInspector)
- .collect(Collectors.toList());
-
- /**
- * Reverse lookup map used to convert records from kafka Writable to hive Writable based on Kafka semantic.
- */
- static final Map<String, Function<KafkaRecordWritable, Writable>>
- recordWritableFnMap = ImmutableMap.<String, Function<KafkaRecordWritable, Writable>>builder()
- .put(MetadataColumn.END_OFFSET.getName(), (record) -> new LongWritable(record.getEndOffset()))
- .put(MetadataColumn.KEY.getName(),
- record -> record.getRecordKey() == null ? null : new BytesWritable(record.getRecordKey()))
- .put(MetadataColumn.OFFSET.getName(), record -> new LongWritable(record.getOffset()))
- .put(MetadataColumn.PARTITION.getName(), record -> new IntWritable(record.getPartition()))
- .put(MetadataColumn.START_OFFSET.getName(), record -> new LongWritable(record.getStartOffset()))
- .put(MetadataColumn.TIMESTAMP.getName(), record -> new LongWritable(record.getTimestamp()))
- .build();
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
new file mode 100644
index 0000000..2e1f6fa
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.hive.serde2.JsonSerDe;
+
+/**
+ * Table properties used by Kafka Storage handler.
+ */
+enum KafkaTableProperties {
+ /**
+ * MANDATORY Table property indicating kafka topic backing the table.
+ */
+ HIVE_KAFKA_TOPIC("kafka.topic", null),
+ /**
+ * MANDATORY Table property indicating kafka broker(s) connection string.
+ */
+ HIVE_KAFKA_BOOTSTRAP_SERVERS("kafka.bootstrap.servers", null),
+ /**
+ * Table property indicating which delegate serde to be used.
+ */
+ SERDE_CLASS_NAME("kafka.serde.class", JsonSerDe.class.getName()),
+ /**
+ * Table property indicating poll/fetch timeout period in millis.
+ * FYI this is independent from internal Kafka consumer timeouts.
+ */
+ KAFKA_POLL_TIMEOUT("hive.kafka.poll.timeout.ms", "5000"),
+
+ MAX_RETRIES("hive.kafka.max.retries", "6"), KAFKA_FETCH_METADATA_TIMEOUT("hive.kafka.metadata.poll.timeout.ms",
+ "30000"),
+ /**
+ * Table property indicating the write semantic possible enum values are:
+ * {@link KafkaOutputFormat.WriteSemantic}.
+ */
+ WRITE_SEMANTIC_PROPERTY("kafka.write.semantic", KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE.name()),
+ /**
+ * Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call.
+ */
+ HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false");
+
+ /**
+ * Kafka storage handler table properties constructor.
+ * @param name property name.
+ * @param defaultValue default value, set to NULL if the property is mandatory and need to be set by the user.
+ */
+ KafkaTableProperties(String name, String defaultValue) {
+ this.name = name;
+ this.defaultValue = defaultValue;
+ this.mandatory = defaultValue == null;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public boolean isMandatory() {
+ return mandatory;
+ }
+
+ private final String name;
+ private final String defaultValue;
+ private final boolean mandatory;
+}