You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/10/13 17:48:19 UTC
[3/5] hive git commit: HIVE-20639 : Add ability to Write Data from
Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
new file mode 100644
index 0000000..6ae9c8d
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.ReflectionUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Utils class for Kafka Storage handler plus some Constants.
+ */
+final class KafkaUtils {
+
+ private KafkaUtils() {
+ }
+
+ /**
+ * Table property prefix used to inject kafka consumer properties, e.g "kafka.consumer.max.poll.records" = "5000"
+ * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT MANDATORY defaults to nothing
+ */
+ static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
+
+ /**
+ * Table property prefix used to inject kafka producer properties, e.g "kafka.producer.lingers.ms" = "100".
+ */
+ static final String PRODUCER_CONFIGURATION_PREFIX = "kafka.producer";
+
+ /**
+ * Set of Kafka properties that the user can not set via DDLs.
+ */
+ static final Set<String>
+ FORBIDDEN_PROPERTIES =
+ new HashSet<>(ImmutableList.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+
+ /**
+ * @param configuration Job configs
+ *
+ * @return default consumer properties
+ */
+ static Properties consumerProperties(Configuration configuration) {
+ final Properties props = new Properties();
+ // we are managing the commit offset
+ props.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, Utilities.getTaskId(configuration));
+ props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ // we are seeking in the stream so no reset
+ props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ String brokerEndPoint = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
+ throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config "
+ + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ }
+ props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ // user can always override stuff
+ props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
+ return props;
+ }
+
+ private static Map<String, String> extractExtraProperties(final Configuration configuration, String prefix) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ final Map<String, String> kafkaProperties = configuration.getValByRegex("^" + prefix + "\\..*");
+ for (Map.Entry<String, String> entry : kafkaProperties.entrySet()) {
+ String key = entry.getKey().substring(prefix.length() + 1);
+ if (FORBIDDEN_PROPERTIES.contains(key)) {
+ throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
+ }
+ builder.put(key, entry.getValue());
+ }
+ return builder.build();
+ }
+
+ static Properties producerProperties(Configuration configuration) {
+ final String writeSemanticValue = configuration.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName());
+ final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.valueOf(writeSemanticValue);
+ final Properties properties = new Properties();
+ String brokerEndPoint = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
+ throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config "
+ + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+ }
+ properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+ // user can always override stuff
+ properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
+ String taskId = configuration.get("mapred.task.id", null);
+ properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG,
+ taskId == null ? "random_" + UUID.randomUUID().toString() : taskId);
+ switch (writeSemantic) {
+ case BEST_EFFORT:
+ break;
+ case AT_LEAST_ONCE:
+ properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
+ //The number of acknowledgments the producer requires the leader to have received before considering a request as
+ // complete, all means from all replicas.
+ properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
+ break;
+ case EXACTLY_ONCE:
+ // Assuming that TaskId is ReducerId_attemptId. need the Reducer ID to fence out zombie kafka producers.
+ String reducerId = getTaskId(configuration);
+ //The number of acknowledgments the producer requires the leader to have received before considering a request as
+ // complete, all means from all replicas.
+ properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
+ properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
+ properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, reducerId);
+ //Producer set to be IDEMPOTENT eg ensure that send() retries are idempotent.
+ properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Semantic " + writeSemantic);
+ }
+ return properties;
+ }
+
+ @SuppressWarnings("SameParameterValue") static void copyDependencyJars(Configuration conf, Class<?>... classes)
+ throws IOException {
+ Set<String> jars = new HashSet<>();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ jars.addAll(conf.getStringCollection("tmpjars"));
+ jars.addAll(Arrays.stream(classes)
+ .filter(Objects::nonNull)
+ .map(clazz -> {
+ String path = Utilities.jarFinderGetJar(clazz);
+ if (path == null) {
+ throw new RuntimeException("Could not find jar for class "
+ + clazz
+ + " in order to ship it to the cluster.");
+ }
+ try {
+ if (!localFs.exists(new Path(path))) {
+ throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return path;
+ }).collect(Collectors.toList()));
+
+ if (jars.isEmpty()) {
+ return;
+ }
+ conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+ }
+
+ static AbstractSerDe createDelegate(String className) {
+ final Class<? extends AbstractSerDe> clazz;
+ try {
+ //noinspection unchecked
+ clazz = (Class<? extends AbstractSerDe>) Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ // we are not setting conf thus null is okay
+ return ReflectionUtil.newInstance(clazz, null);
+ }
+
+ static ProducerRecord<byte[], byte[]> toProducerRecord(String topic, KafkaWritable value) {
+ return new ProducerRecord<>(topic,
+ value.getPartition() != -1 ? value.getPartition() : null,
+ value.getTimestamp() != -1L ? value.getTimestamp() : null,
+ value.getRecordKey(),
+ value.getValue());
+ }
+
+ /**
+ * Check if the exception is Non-Retriable there a show stopper all we can do is clean and exit.
+ * @param exception input exception object.
+ * @return true if the exception is fatal thus we only can abort and rethrow the cause.
+ */
+ static boolean exceptionIsFatal(final Throwable exception) {
+ final boolean
+ securityException =
+ exception instanceof AuthenticationException
+ || exception instanceof AuthorizationException
+ || exception instanceof SecurityDisabledException;
+
+ final boolean
+ communicationException =
+ exception instanceof InvalidTopicException
+ || exception instanceof UnknownServerException
+ || exception instanceof SerializationException
+ || exception instanceof OffsetMetadataTooLarge
+ || exception instanceof IllegalStateException;
+
+ return securityException || communicationException;
+ }
+
+ /**
+ * Computes the kafka producer transaction id. The Tx id HAS to be the same across task restarts,
+ * that is why we are excluding the attempt id by removing the last string after last `_`.
+ * Assuming the taskId format is taskId_[m-r]_attemptId.
+ *
+ * @param hiveConf Hive Configuration.
+ * @return the taskId without the attempt id.
+ */
+ static String getTaskId(Configuration hiveConf) {
+ String id = Preconditions.checkNotNull(hiveConf.get("mapred.task.id", null));
+ int index = id.lastIndexOf("_");
+ if (index != -1) {
+ return id.substring(0, index);
+ }
+ return id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
new file mode 100644
index 0000000..681b666
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Writable implementation of Kafka ConsumerRecord.
+ * Serialized in the form:
+ * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) |
+ * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) |
+ * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])}
+ */
+public class KafkaWritable implements Writable {
+
+ private int partition;
+ private long offset;
+ private long timestamp;
+ private byte[] value;
+ private byte[] recordKey;
+
+ /**
+ * Fist offset given by the input split used to pull the event {@link KafkaInputSplit#getStartOffset()}.
+ */
+ private long startOffset;
+ /**
+ * Last Offset given by the input split used to pull the event {@link KafkaInputSplit#getEndOffset()}.
+ */
+ private long endOffset;
+
+ void set(ConsumerRecord<byte[], byte[]> consumerRecord, long startOffset, long endOffset) {
+ this.partition = consumerRecord.partition();
+ this.timestamp = consumerRecord.timestamp();
+ this.offset = consumerRecord.offset();
+ this.value = consumerRecord.value();
+ this.recordKey = consumerRecord.key();
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ KafkaWritable(int partition,
+ long offset,
+ long timestamp,
+ byte[] value,
+ long startOffset,
+ long endOffset,
+ @Nullable byte[] recordKey) {
+ this.partition = partition;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.value = value;
+ this.recordKey = recordKey;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ KafkaWritable(int partition, long timestamp, byte[] value, @Nullable byte[] recordKey) {
+ this(partition, -1, timestamp, value, -1, -1, recordKey);
+ }
+
+ @SuppressWarnings("WeakerAccess") public KafkaWritable() {
+ }
+
+ @Override public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(timestamp);
+ dataOutput.writeInt(partition);
+ dataOutput.writeLong(offset);
+ dataOutput.writeLong(startOffset);
+ dataOutput.writeLong(endOffset);
+ dataOutput.writeInt(value.length);
+ dataOutput.write(value);
+ if (recordKey != null) {
+ dataOutput.writeInt(recordKey.length);
+ dataOutput.write(recordKey);
+ } else {
+ dataOutput.writeInt(-1);
+ }
+ }
+
+ @Override public void readFields(DataInput dataInput) throws IOException {
+ timestamp = dataInput.readLong();
+ partition = dataInput.readInt();
+ offset = dataInput.readLong();
+ startOffset = dataInput.readLong();
+ endOffset = dataInput.readLong();
+ int dataSize = dataInput.readInt();
+ if (dataSize > 0) {
+ value = new byte[dataSize];
+ dataInput.readFully(value);
+ } else {
+ value = new byte[0];
+ }
+ int keyArraySize = dataInput.readInt();
+ if (keyArraySize > -1) {
+ recordKey = new byte[keyArraySize];
+ dataInput.readFully(recordKey);
+ } else {
+ recordKey = null;
+ }
+ }
+
+ int getPartition() {
+ return partition;
+ }
+
+ @SuppressWarnings("WeakerAccess") long getOffset() {
+ return offset;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+
+ byte[] getValue() {
+ return value;
+ }
+
+ @SuppressWarnings("WeakerAccess") long getStartOffset() {
+ return startOffset;
+ }
+
+ @SuppressWarnings("WeakerAccess") long getEndOffset() {
+ return endOffset;
+ }
+
+ @Nullable byte[] getRecordKey() {
+ return recordKey;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KafkaWritable)) {
+ return false;
+ }
+ KafkaWritable writable = (KafkaWritable) o;
+ return partition == writable.partition
+ && offset == writable.offset
+ && startOffset == writable.startOffset
+ && endOffset == writable.endOffset
+ && timestamp == writable.timestamp
+ && Arrays.equals(value, writable.value)
+ && Arrays.equals(recordKey, writable.recordKey);
+ }
+
+ @Override public int hashCode() {
+ int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp);
+ result = 31 * result + Arrays.hashCode(value);
+ result = 31 * result + Arrays.hashCode(recordKey);
+ return result;
+ }
+
+ @Override public String toString() {
+ return "KafkaWritable{"
+ + "partition="
+ + partition
+ + ", offset="
+ + offset
+ + ", startOffset="
+ + startOffset
+ + ", endOffset="
+ + endOffset
+ + ", timestamp="
+ + timestamp
+ + ", value="
+ + Arrays.toString(value)
+ + ", recordKey="
+ + Arrays.toString(recordKey)
+ + '}';
+ }
+
+ Writable getHiveWritable(MetadataColumn metadataColumn) {
+ switch (metadataColumn) {
+ case OFFSET:
+ return new LongWritable(getOffset());
+ case PARTITION:
+ return new IntWritable(getPartition());
+ case TIMESTAMP:
+ return new LongWritable(getTimestamp());
+ case KEY:
+ return getRecordKey() == null ? null : new BytesWritable(getRecordKey());
+ case START_OFFSET:
+ return new LongWritable(getStartOffset());
+ case END_OFFSET:
+ return new LongWritable(getEndOffset());
+ default:
+ throw new IllegalArgumentException("Unknown metadata column [" + metadataColumn.getName() + "]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
new file mode 100644
index 0000000..60e1aea
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Enum class for all the metadata columns appended to the Kafka row by the Hive Serializer/Deserializer.
+ *
+ * <p>
+ *<b>Design Notes:</b>
+ *
+ * It is important to note that the order at which columns are appended matters, the order is governed by:
+ * {@link MetadataColumn#KAFKA_METADATA_COLUMNS}.
+ *
+ * If you add a new Column make sure to added its Writable converter to {@link KafkaWritable}.
+ *
+ */
+enum MetadataColumn {
+
+ /**
+ * Kafka Record's offset column name added as extra metadata column to row as long.
+ */
+ OFFSET("__offset", TypeInfoFactory.longTypeInfo),
+ /**
+ * Record Kafka Partition column name added as extra meta column of type int.
+ */
+ PARTITION("__partition", TypeInfoFactory.intTypeInfo),
+ /**
+ * Record Kafka key column name added as extra meta column of type binary blob.
+ */
+ KEY("__key", TypeInfoFactory.binaryTypeInfo),
+ /**
+ * Record Timestamp column name, added as extra meta column of type long.
+ */
+ TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo),
+ /**
+ * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner.
+ */
+ // @TODO To be removed next PR it is here to make review easy
+ START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo),
+ /**
+ * End offset given by input split at run time.
+ */
+ // @TODO To be removed next PR it is here to make review easy
+ END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo);
+
+ /**
+ * Kafka metadata columns list that indicates the order of appearance for each column in final row.
+ */
+ private static final List<MetadataColumn>
+ KAFKA_METADATA_COLUMNS =
+ Arrays.asList(KEY, PARTITION, OFFSET, TIMESTAMP, START_OFFSET, END_OFFSET);
+
+ static final List<ObjectInspector>
+ KAFKA_METADATA_INSPECTORS =
+ KAFKA_METADATA_COLUMNS.stream().map(MetadataColumn::getObjectInspector).collect(Collectors.toList());
+
+ static final List<String>
+ KAFKA_METADATA_COLUMN_NAMES =
+ KAFKA_METADATA_COLUMNS.stream().map(MetadataColumn::getName).collect(Collectors.toList());
+
+ private final String name;
+ private final TypeInfo typeInfo;
+
+ MetadataColumn(String name, TypeInfo typeInfo) {
+ this.name = name;
+ this.typeInfo = typeInfo;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public AbstractPrimitiveWritableObjectInspector getObjectInspector() {
+ return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
+ typeInfo.getTypeName()));
+ }
+
+ private static final Map<String, MetadataColumn>
+ NAMES_MAP =
+ Arrays.stream(MetadataColumn.values()).collect(Collectors.toMap(MetadataColumn::getName, Function.identity()));
+ /**
+ * Column name to MetadataColumn instance.
+ * @param name column name.
+ * @return instance of {@link MetadataColumn} or null if column name is absent
+ */
+ @Nullable
+ static MetadataColumn forName(String name) {
+ return NAMES_MAP.get(name);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java
new file mode 100644
index 0000000..b2bb208
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Predicate;
+
+/**
+ * Retry utils class mostly taken from Apache Druid Project org.apache.druid.java.util.common.RetryUtils.
+ */
+public final class RetryUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);
+ private static final long MAX_SLEEP_MILLIS = 60000;
+ private static final long BASE_SLEEP_MILLIS = 1000;
+
+ private RetryUtils() {
+ }
+
+ /**
+ * Task to be performed.
+ * @param <T> returned type of the task.
+ */
+ public interface Task<T> {
+ /**
+ * This method is tried up to maxTries times unless it succeeds.
+ */
+ T perform() throws Exception;
+ }
+
+ /**
+ * Cleanup procedure after each failed attempt.
+ */
+ @SuppressWarnings("WeakerAccess") public interface CleanupAfterFailure {
+ /**
+ * This is called once {@link Task#perform()} fails. Retrying is stopped once this method throws an exception,
+ * so errors inside this method should be ignored if you don't want to stop retrying.
+ */
+ void cleanup();
+ }
+
+ /**
+ * Retry an operation using fuzzy exponentially increasing backoff. The wait time after the nth failed attempt is
+ * min(60000ms, 1000ms * pow(2, n - 1)), fuzzed by a number drawn from a Gaussian distribution with mean 0 and
+ * standard deviation 0.2.
+ *
+ * If maxTries is exhausted, or if shouldRetry returns false, the last exception thrown by "f" will be thrown
+ * by this function.
+ *
+ * @param f the operation
+ * @param shouldRetry predicate determining whether we should retry after a particular exception thrown by "f"
+ * @param quietTries first quietTries attempts will LOG exceptions at DEBUG level rather than WARN
+ * @param maxTries maximum number of attempts
+ *
+ * @return result of the first successful operation
+ *
+ * @throws Exception if maxTries is exhausted, or shouldRetry returns false
+ */
+ @SuppressWarnings("WeakerAccess") static <T> T retry(final Task<T> f,
+ final Predicate<Throwable> shouldRetry,
+ final int quietTries,
+ final int maxTries,
+ @Nullable final CleanupAfterFailure cleanupAfterFailure,
+ @Nullable final String messageOnRetry) throws Exception {
+ Preconditions.checkArgument(maxTries > 0, "maxTries > 0");
+ Preconditions.checkArgument(quietTries >= 0, "quietTries >= 0");
+ int nTry = 0;
+ final int maxRetries = maxTries - 1;
+ while (true) {
+ try {
+ nTry++;
+ return f.perform();
+ } catch (Throwable e) {
+ if (cleanupAfterFailure != null) {
+ cleanupAfterFailure.cleanup();
+ }
+ if (nTry < maxTries && shouldRetry.test(e)) {
+ awaitNextRetry(e, messageOnRetry, nTry, maxRetries, nTry <= quietTries);
+ } else {
+ Throwables.propagateIfInstanceOf(e, Exception.class);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+ }
+
+ static <T> T retry(final Task<T> f, Predicate<Throwable> shouldRetry, final int maxTries) throws Exception {
+ return retry(f, shouldRetry, 0, maxTries);
+ }
+
+ @SuppressWarnings({ "WeakerAccess", "SameParameterValue" }) static <T> T retry(final Task<T> f,
+ final Predicate<Throwable> shouldRetry,
+ final int quietTries,
+ final int maxTries) throws Exception {
+ return retry(f, shouldRetry, quietTries, maxTries, null, null);
+ }
+
+ @SuppressWarnings("unused") public static <T> T retry(final Task<T> f,
+ final Predicate<Throwable> shouldRetry,
+ final CleanupAfterFailure onEachFailure,
+ final int maxTries,
+ final String messageOnRetry) throws Exception {
+ return retry(f, shouldRetry, 0, maxTries, onEachFailure, messageOnRetry);
+ }
+
+ private static void awaitNextRetry(final Throwable e,
+ @Nullable final String messageOnRetry,
+ final int nTry,
+ final int maxRetries,
+ final boolean quiet) throws InterruptedException {
+ final long sleepMillis = nextRetrySleepMillis(nTry);
+ final String fullMessage;
+
+ if (messageOnRetry == null) {
+ fullMessage = String.format("Retrying (%d of %d) in %,dms.", nTry, maxRetries, sleepMillis);
+ } else {
+ fullMessage = String.format("%s, retrying (%d of %d) in %,dms.", messageOnRetry, nTry, maxRetries, sleepMillis);
+ }
+
+ if (quiet) {
+ LOG.debug(fullMessage, e);
+ } else {
+ LOG.warn(fullMessage, e);
+ }
+
+ Thread.sleep(sleepMillis);
+ }
+
+ private static long nextRetrySleepMillis(final int nTry) {
+ final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2);
+ return (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1)) * fuzzyMultiplier);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
new file mode 100644
index 0000000..c95bdb0
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Hive to Kafka Simple Record Writer. It can be used to achieve AT LEAST ONCE semantic, or no guaranties at all.
+ */
+class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<BytesWritable, KafkaWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaWriter.class);
+ private static final String
+ TIMEOUT_CONFIG_HINT =
+ "Try increasing producer property [`retries`] and [`retry.backoff.ms`] to avoid this error [{}].";
+ private static final String
+ ABORT_MSG =
+ "Writer [%s] aborting Send. Caused by [%s]. Sending to topic [%s]. Record offset [%s];";
+ private static final String
+ ACTION_ABORT =
+ "WriterId [{}] lost record from Topic [{}], delivery Semantic [{}] -> ACTION=ABORT, ERROR caused by [{}]";
+ private static final String
+ ACTION_CARRY_ON =
+ "WriterId [{}], lost record from Topic [{}], delivery Semantic [{}] -> ACTION=CARRY-ON";
+
+ private final String topic;
+ private final String writerId;
+ private final KafkaOutputFormat.WriteSemantic writeSemantic;
+ private final KafkaProducer<byte[], byte[]> producer;
+ private final Callback callback;
+ private final AtomicReference<Exception> sendExceptionRef = new AtomicReference<>();
+ private final AtomicLong lostRecords = new AtomicLong(0L);
+ private long sentRecords = 0L;
+
+ /**
+ * @param topic Kafka Topic.
+ * @param writerId Writer Id use for logging.
+ * @param atLeastOnce true if the desired delivery semantic is at least once.
+ * @param properties Kafka Producer properties.
+ */
+ SimpleKafkaWriter(String topic, @Nullable String writerId, boolean atLeastOnce, Properties properties) {
+ this.writeSemantic =
+ atLeastOnce ? KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE : KafkaOutputFormat.WriteSemantic.BEST_EFFORT;
+ this.writerId = writerId == null ? UUID.randomUUID().toString() : writerId;
+ this.topic = Preconditions.checkNotNull(topic, "Topic can not be null");
+ Preconditions.checkState(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+ "set [" + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + "] property");
+ producer = new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer());
+
+ this.callback = (metadata, exception) -> {
+ if (exception != null) {
+ lostRecords.getAndIncrement();
+ switch (writeSemantic) {
+ case BEST_EFFORT:
+ LOG.warn(ACTION_CARRY_ON, getWriterId(), topic, writeSemantic);
+ break;
+ case AT_LEAST_ONCE:
+ LOG.error(ACTION_ABORT, getWriterId(), topic, writeSemantic, exception.getMessage());
+ sendExceptionRef.compareAndSet(null, exception);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported delivery semantic " + writeSemantic);
+ }
+ }
+ };
+ LOG.info("Starting WriterId [{}], Delivery Semantic [{}], Target Kafka Topic [{}]",
+ writerId, writeSemantic,
+ topic);
+ }
+
+ @Override public void write(Writable w) throws IOException {
+ checkExceptions();
+ try {
+ sentRecords++;
+ producer.send(KafkaUtils.toProducerRecord(topic, (KafkaWritable) w), callback);
+ } catch (KafkaException kafkaException) {
+ handleKafkaException(kafkaException);
+ checkExceptions();
+ }
+ }
+
+ private void handleKafkaException(KafkaException kafkaException) {
+ if (kafkaException instanceof TimeoutException) {
+ //This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up.
+ LOG.error(TIMEOUT_CONFIG_HINT, kafkaException.getMessage());
+ }
+ if (KafkaUtils.exceptionIsFatal(kafkaException)) {
+ LOG.error(String.format(ABORT_MSG, writerId, kafkaException.getMessage(), topic, -1L));
+ sendExceptionRef.compareAndSet(null, kafkaException);
+ } else {
+ if (writeSemantic == KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE) {
+ LOG.error(ACTION_ABORT, writerId, topic, writeSemantic, kafkaException.getMessage());
+ sendExceptionRef.compareAndSet(null, kafkaException);
+ } else {
+ LOG.warn(ACTION_CARRY_ON, writerId, topic, writeSemantic);
+ }
+ }
+ }
+
+ @Override public void close(boolean abort) throws IOException {
+ if (abort) {
+ LOG.info("Aborting is set to TRUE, Closing writerId [{}] without flush.", writerId);
+ producer.close(0, TimeUnit.MICROSECONDS);
+ return;
+ } else {
+ LOG.info("Flushing Kafka Producer with writerId [{}]", writerId);
+ producer.flush();
+ LOG.info("Closing WriterId [{}]", writerId);
+ producer.close();
+ }
+ LOG.info("Closed WriterId [{}] Delivery semantic [{}], Topic[{}], Total sent Records [{}], Total Lost Records [{}]",
+ writerId, writeSemantic,
+ topic,
+ sentRecords,
+ lostRecords.get());
+ checkExceptions();
+ }
+
+ @VisibleForTesting String getWriterId() {
+ return writerId;
+ }
+
+ @VisibleForTesting long getLostRecords() {
+ return lostRecords.get();
+ }
+
+ @VisibleForTesting long getSentRecords() {
+ return sentRecords;
+ }
+
+ @Override public void write(BytesWritable bytesWritable, KafkaWritable kafkaWritable) throws IOException {
+ this.write(kafkaWritable);
+ }
+
+ @Override public void close(Reporter reporter) throws IOException {
+ this.close(false);
+ }
+
+ private void checkExceptions() throws IOException {
+ if (sendExceptionRef.get() != null) {
+ LOG.error("Send Exception Aborting write from writerId [{}]", writerId);
+ producer.close(0, TimeUnit.MICROSECONDS);
+ throw new IOException(sendExceptionRef.get());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
new file mode 100644
index 0000000..fb4d034
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Transactional Kafka Record Writer used to achieve Exactly once semantic.
+ */
+class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<BytesWritable, KafkaWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class);
+ private static final String TRANSACTION_DIR = "transaction_states";
+
+ private final String topic;
+ private final HiveKafkaProducer<byte[], byte[]> producer;
+ private final Callback callback;
+ private final AtomicReference<Exception> sendExceptionRef = new AtomicReference<>();
+ private final Path openTxFileName;
+ private final boolean optimisticCommit;
+ private final FileSystem fileSystem;
+ private final Map<TopicPartition, Long> offsets = new HashMap<>();
+ private final String writerIdTopicId;
+ private final long producerId;
+ private final short producerEpoch;
+ private long sentRecords = 0L;
+
+ /**
+ * @param topic Kafka topic.
+ * @param producerProperties kafka producer properties.
+ * @param queryWorkingPath the Query working directory as, table_directory/hive_query_id.
+ * Used to store the state of the transaction and/or log sent records and partitions.
+ * for more information see:
+ * {@link KafkaStorageHandler#getQueryWorkingDir(org.apache.hadoop.hive.metastore.api.Table)}
+ * @param fileSystem file system handler.
+ * @param optimisticCommit if true the commit will happen at the task level otherwise will be delegated to HS2.
+ */
+ TransactionalKafkaWriter(String topic, Properties producerProperties,
+ Path queryWorkingPath,
+ FileSystem fileSystem,
+ @Nullable Boolean optimisticCommit) {
+ this.fileSystem = fileSystem;
+ this.topic = Preconditions.checkNotNull(topic, "NULL topic !!");
+
+ Preconditions.checkState(producerProperties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+ "set [" + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + "] property");
+ producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ this.producer = new HiveKafkaProducer<>(producerProperties);
+ this.optimisticCommit = optimisticCommit == null ? true : optimisticCommit;
+ this.callback = (metadata, exception) -> {
+ if (exception != null) {
+ sendExceptionRef.compareAndSet(null, exception);
+ } else {
+ //According to https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
+ //Callbacks form the same TopicPartition will return in order thus this will keep track of most recent offset.
+ final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+ offsets.put(tp, metadata.offset());
+ }
+ };
+ // Start Tx
+ assert producer.getTransactionalId() != null;
+ try {
+ producer.initTransactions();
+ producer.beginTransaction();
+ } catch (Exception exception) {
+ logHints(exception);
+ if (tryToAbortTx(exception)) {
+ LOG.error("Aborting Transaction [{}] cause by ERROR [{}]",
+ producer.getTransactionalId(),
+ exception.getMessage());
+ producer.abortTransaction();
+ }
+ LOG.error("Closing writer [{}] caused by ERROR [{}]", producer.getTransactionalId(), exception.getMessage());
+ producer.close(0, TimeUnit.MILLISECONDS);
+ throw exception;
+ }
+ writerIdTopicId = String.format("WriterId [%s], Kafka Topic [%s]", producer.getTransactionalId(), topic);
+ producerEpoch = this.optimisticCommit ? -1 : producer.getEpoch();
+ producerId = this.optimisticCommit ? -1 : producer.getProducerId();
+ LOG.info("DONE with Initialization of {}, Epoch[{}], internal ID[{}]", writerIdTopicId, producerEpoch, producerId);
+ //Writer base working directory
+ openTxFileName =
+ this.optimisticCommit ?
+ null :
+ new Path(new Path(new Path(queryWorkingPath, TRANSACTION_DIR), producer.getTransactionalId()),
+ String.valueOf(producerEpoch));
+ }
+
+ @Override public void write(Writable w) throws IOException {
+ checkExceptions();
+ try {
+ sentRecords++;
+ producer.send(KafkaUtils.toProducerRecord(topic, (KafkaWritable) w), callback);
+ } catch (Exception e) {
+ if (tryToAbortTx(e)) {
+ // producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
+ producer.abortTransaction();
+ }
+ producer.close(0, TimeUnit.MILLISECONDS);
+ sendExceptionRef.compareAndSet(null, e);
+ checkExceptions();
+ }
+ }
+
+ private void logHints(Exception e) {
+ if (e instanceof TimeoutException) {
+ LOG.error("Maybe Try to increase [`retry.backoff.ms`] to avoid this error [{}].", e.getMessage());
+ }
+ }
+
+ /**
+ * The non Abort Close method can be split into 2 parts.
+ * Part one is to Flush to Kafka all the buffered Records then Log (Topic-Partition, Offset).
+ * Part two is To either commit the TX or Save the state of the TX to WAL and let HS2 do the commit.
+ *
+ * @param abort if set to true will abort flush and exit
+ * @throws IOException exception causing the failure
+ */
+ @Override public void close(boolean abort) throws IOException {
+ if (abort) {
+ // Case Abort, try to AbortTransaction -> Close producer ASAP -> Exit;
+ LOG.warn("Aborting Transaction and Sending from {}", writerIdTopicId);
+ try {
+ producer.abortTransaction();
+ } catch (Exception e) {
+ LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage());
+ }
+ producer.close(0, TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ // Normal Case -> lOG and Commit then Close
+ LOG.info("Flushing Kafka buffer of writerId {}", writerIdTopicId);
+ producer.flush();
+
+ // No exception good let's log to a file whatever Flushed.
+ String formattedMsg = "Topic[%s] Partition [%s] -> Last offset [%s]";
+ String
+ flushedOffsetMsg =
+ offsets.entrySet()
+ .stream()
+ .map(topicPartitionLongEntry -> String.format(formattedMsg,
+ topicPartitionLongEntry.getKey().topic(),
+ topicPartitionLongEntry.getKey().partition(),
+ topicPartitionLongEntry.getValue()))
+ .collect(Collectors.joining(","));
+
+ LOG.info("WriterId {} flushed the following [{}] ", writerIdTopicId, flushedOffsetMsg);
+ // OPTIMISTIC COMMIT OR PERSIST STATE OF THE TX_WAL
+ checkExceptions();
+ if (optimisticCommit) {
+ // Case Commit at the task level
+ commitTransaction();
+ } else {
+ // Case delegate TX commit to HS2
+ persistTxState();
+ }
+ checkExceptions();
+ producer.close();
+ LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]",
+ producer.getTransactionalId(),
+ sentRecords,
+ topic);
+ }
+
+ private void commitTransaction() {
+ LOG.info("Attempting Optimistic commit by {}", writerIdTopicId);
+ try {
+ producer.commitTransaction();
+ } catch (Exception e) {
+ sendExceptionRef.compareAndSet(null, e);
+ }
+ }
+
+ /**
+ * Write the Kafka Consumer PID and Epoch to checkpoint file {@link TransactionalKafkaWriter#openTxFileName}.
+ */
+ private void persistTxState() {
+ LOG.info("Committing state to path [{}] by [{}]", openTxFileName.toString(), writerIdTopicId);
+ try (FSDataOutputStream outStream = fileSystem.create(openTxFileName)) {
+ outStream.writeLong(producerId);
+ outStream.writeShort(producerEpoch);
+ } catch (Exception e) {
+ sendExceptionRef.compareAndSet(null, e);
+ }
+ }
+
+ @Override public void write(BytesWritable bytesWritable, KafkaWritable kafkaWritable) throws IOException {
+ write(kafkaWritable);
+ }
+
+ @Override public void close(Reporter reporter) throws IOException {
+ close(false);
+ }
+
+ @VisibleForTesting long getSentRecords() {
+ return sentRecords;
+ }
+
+ @VisibleForTesting short getProducerEpoch() {
+ return producerEpoch;
+ }
+
+ @VisibleForTesting long getProducerId() {
+ return producerId;
+ }
+
+ /**
+ * Checks for existing exception. In case of exception will close consumer and rethrow as IOException
+ * @throws IOException abort if possible, close consumer then rethrow exception.
+ */
+ private void checkExceptions() throws IOException {
+ if (sendExceptionRef.get() != null && sendExceptionRef.get() instanceof KafkaException && sendExceptionRef.get()
+ .getCause() instanceof ProducerFencedException) {
+ // producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
+ sendExceptionRef.updateAndGet(e -> (KafkaException) e.getCause());
+ }
+ if (sendExceptionRef.get() != null) {
+ final Exception exception = sendExceptionRef.get();
+ logHints(exception);
+ if (tryToAbortTx(exception)) {
+ LOG.error("Aborting Transaction [{}] cause by ERROR [{}]", writerIdTopicId, exception.getMessage());
+ producer.abortTransaction();
+ }
+ LOG.error("Closing writer [{}] caused by ERROR [{}]", writerIdTopicId, exception.getMessage());
+ producer.close(0, TimeUnit.MILLISECONDS);
+ throw new IOException(exception);
+ }
+ }
+
+ private boolean tryToAbortTx(Throwable e) {
+ // According to https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
+ // We can't recover from these exceptions, so our only option is to close the producer and exit.
+ boolean
+ isNotFencedOut =
+ !(e instanceof ProducerFencedException)
+ && !(e instanceof OutOfOrderSequenceException)
+ && !(e instanceof AuthenticationException);
+ // producer.send() may throw a KafkaException which wraps a FencedException therefore check inner cause.
+ boolean causeIsNotFencedOut = !(e.getCause() != null && e.getCause() instanceof ProducerFencedException);
+ return isNotFencedOut && causeIsNotFencedOut;
+ }
+
+ /**
+ * Given a query workingDirectory as table_directory/hive_query_id/ will fetch the open transaction states.
+ * Table directory is {@link org.apache.hadoop.hive.metastore.api.Table#getSd()#getLocation()}.
+ * Hive Query ID is inferred from the JobConf see {@link KafkaStorageHandler#getQueryId()}.
+ *
+ * The path to a transaction state is as follow.
+ * .../{@code queryWorkingDir}/{@code TRANSACTION_DIR}/{@code writerId}/{@code producerEpoch}
+ *
+ * The actual state is stored in the file {@code producerEpoch}.
+ * The file contains a {@link Long} as internal producer Id and a {@link Short} as the producer epoch.
+ * According to Kafka API, highest epoch corresponds to the active Producer, therefore if there is multiple
+ * {@code producerEpoch} files will pick the maximum based on {@link Short::compareTo}.
+ *
+ * @param fs File system handler.
+ * @param queryWorkingDir Query working Directory, see:
+ * {@link KafkaStorageHandler#getQueryWorkingDir(org.apache.hadoop.hive.metastore.api.Table)}.
+ * @return Map of Transaction Ids to Pair of Kafka Producer internal ID (Long) and producer epoch (short)
+ * @throws IOException if any of the IO operations fail.
+ */
+ static Map<String, Pair<Long, Short>> getTransactionsState(FileSystem fs, Path queryWorkingDir) throws IOException {
+ //list all current Dir
+ final Path transactionWorkingDir = new Path(queryWorkingDir, TRANSACTION_DIR);
+ final FileStatus[] files = fs.listStatus(transactionWorkingDir);
+ final Set<FileStatus>
+ transactionSet =
+ Arrays.stream(files).filter(FileStatus::isDirectory).collect(Collectors.toSet());
+ Set<Path> setOfTxPath = transactionSet.stream().map(FileStatus::getPath).collect(Collectors.toSet());
+ ImmutableMap.Builder<String, Pair<Long, Short>> builder = ImmutableMap.builder();
+ setOfTxPath.forEach(path -> {
+ final String txId = path.getName();
+ try {
+ FileStatus[] epochFiles = fs.listStatus(path);
+ // List all the Epoch if any and select the max.
+ // According to Kafka API recent venison of Producer with the same TxID will have greater epoch and same PID.
+ Optional<Short>
+ maxEpoch =
+ Arrays.stream(epochFiles)
+ .filter(FileStatus::isFile)
+ .map(fileStatus -> Short.valueOf(fileStatus.getPath().getName()))
+ .max(Short::compareTo);
+ short
+ epoch =
+ maxEpoch.orElseThrow(() -> new RuntimeException("Missing sub directory epoch from directory ["
+ + path.toString()
+ + "]"));
+ Path openTxFileName = new Path(path, String.valueOf(epoch));
+ long internalId;
+ try (FSDataInputStream inStream = fs.open(openTxFileName)) {
+ internalId = inStream.readLong();
+ short fileEpoch = inStream.readShort();
+ if (epoch != fileEpoch) {
+ throw new RuntimeException(String.format("Was expecting [%s] but got [%s] from path [%s]",
+ epoch,
+ fileEpoch,
+ path.toString()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ builder.put(txId, Pair.of(internalId, epoch));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
new file mode 100644
index 0000000..db2515c
--- /dev/null
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Test class for Hive Kafka Producer.
+ */
+@SuppressWarnings("unchecked") public class HiveKafkaProducerTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducerTest.class);
+ private static final int RECORD_NUMBER = 17384;
+ private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+ private static final KafkaBrokerResource KAFKA_BROKER_RESOURCE = new KafkaBrokerResource();
+
+ private static final String TOPIC = "test-tx-producer";
+ private static final List<ProducerRecord<byte[], byte[]>>
+ RECORDS =
+ IntStream.range(0, RECORD_NUMBER).mapToObj(number -> {
+ final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8"));
+ return new ProducerRecord<>(TOPIC, value, KEY_BYTES);
+ }).collect(Collectors.toList());
+
+ @BeforeClass public static void setupCluster() throws Throwable {
+ KAFKA_BROKER_RESOURCE.before();
+ }
+
+ @AfterClass public static void tearDownCluster() {
+ KAFKA_BROKER_RESOURCE.after();
+ }
+
+ private KafkaConsumer<byte[], byte[]> consumer;
+ private Properties producerProperties;
+ private HiveKafkaProducer producer;
+
+ @Before public void setUp() {
+ LOG.info("setting up Config");
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("enable.auto.commit", "false");
+ consumerProps.setProperty("auto.offset.reset", "none");
+ consumerProps.setProperty("bootstrap.servers", KafkaBrokerResource.BROKER_IP_PORT);
+ consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
+ consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
+ consumerProps.setProperty("request.timeout.ms", "3002");
+ consumerProps.setProperty("fetch.max.wait.ms", "3001");
+ consumerProps.setProperty("session.timeout.ms", "3001");
+ consumerProps.setProperty("metadata.max.age.ms", "100");
+ consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ this.consumer = new KafkaConsumer<>(consumerProps);
+
+ String txId = UUID.randomUUID().toString();
+ producerProperties = new Properties();
+ producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaBrokerResource.BROKER_IP_PORT);
+ producerProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
+ producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producer = new HiveKafkaProducer(producerProperties);
+ }
+
+ @After public void tearDown() {
+ LOG.info("tearDown");
+ consumer.close();
+ consumer = null;
+ }
+
+ @Test public void resumeTransaction() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ long pid = producer.getProducerId();
+ short epoch = producer.getEpoch();
+ Assert.assertTrue(pid > -1);
+ Assert.assertTrue(epoch > -1);
+ //noinspection unchecked
+ RECORDS.forEach(producer::send);
+ producer.flush();
+ producer.close();
+
+ HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+ secondProducer.resumeTransaction(pid, epoch);
+ secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.commitTransaction();
+ secondProducer.close();
+
+ Collection<TopicPartition> assignment = Collections.singletonList(new TopicPartition(TOPIC, 0));
+ consumer.assign(assignment);
+ consumer.seekToBeginning(assignment);
+ long numRecords = 0;
+ @SuppressWarnings("unchecked") final List<ConsumerRecord<byte[], byte[]>> actualRecords = new ArrayList();
+ while (numRecords < RECORD_NUMBER) {
+ ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+ actualRecords.addAll(consumerRecords.records(new TopicPartition(TOPIC, 0)));
+ numRecords += consumerRecords.count();
+ }
+ Assert.assertEquals("Size matters !!", RECORDS.size(), actualRecords.size());
+ Iterator<ProducerRecord<byte[], byte[]>> expectedIt = RECORDS.iterator();
+ Iterator<ConsumerRecord<byte[], byte[]>> actualIt = actualRecords.iterator();
+ while (expectedIt.hasNext()) {
+ ProducerRecord<byte[], byte[]> expected = expectedIt.next();
+ ConsumerRecord<byte[], byte[]> actual = actualIt.next();
+ Assert.assertArrayEquals("value not matching", expected.value(), actual.value());
+ Assert.assertArrayEquals("key not matching", expected.key(), actual.key());
+ }
+ }
+
+ @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
+ HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+ secondProducer.resumeTransaction(3434L, (short) 12);
+ secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.close();
+ }
+
+ @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpoch() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ long pid = producer.getProducerId();
+ producer.close();
+ HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+ secondProducer.resumeTransaction(pid, (short) 12);
+ secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.close();
+ }
+
+ @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongPID() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ short epoch = producer.getEpoch();
+ producer.close();
+ HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+ secondProducer.resumeTransaction(45L, epoch);
+ secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+ secondProducer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
new file mode 100644
index 0000000..fbcbe9a
--- /dev/null
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import kafka.zk.AdminZkClient;
+import kafka.zk.EmbeddedZookeeper;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.utils.Time;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Properties;
+
+/**
+ * Test Helper Class to start and stop a kafka broker.
+ */
+class KafkaBrokerResource extends ExternalResource {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerResource.class);
+ private static final String TOPIC = "TEST-CREATE_TOPIC";
+ static final String BROKER_IP_PORT = "127.0.0.1:9092";
+ private EmbeddedZookeeper zkServer;
+ private KafkaServer kafkaServer;
+ private AdminZkClient adminZkClient;
+ private Path tmpLogDir;
+
+ /**
+ * Override to set up your specific external resource.
+ *
+ * @throws Throwable if setup fails (which will disable {@code after}
+ */
+ @Override protected void before() throws Throwable {
+ // Start the ZK and the Broker
+ LOG.info("init embedded Zookeeper");
+ zkServer = new EmbeddedZookeeper();
+ tmpLogDir = Files.createTempDirectory("kafka-log-dir-").toAbsolutePath();
+ String zkConnect = "127.0.0.1:" + zkServer.port();
+ LOG.info("init kafka broker");
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dir", tmpLogDir.toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKER_IP_PORT);
+ brokerProps.setProperty("offsets.topic.replication.factor", "1");
+ brokerProps.setProperty("transaction.state.log.replication.factor", "1");
+ brokerProps.setProperty("transaction.state.log.min.isr", "1");
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
+ kafkaServer.startup();
+ kafkaServer.zkClient();
+ adminZkClient = new AdminZkClient(kafkaServer.zkClient());
+ LOG.info("Creating kafka TOPIC [{}]", TOPIC);
+ adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+
+ /**
+ * Override to tear down your specific external resource.
+ */
+ @Override protected void after() {
+ super.after();
+ try {
+ FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
+ } catch (IOException e) {
+ LOG.error("Error cleaning " + tmpLogDir.toString(), e);
+ }
+ if (kafkaServer != null) {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+ }
+ zkServer.shutdown();
+ }
+
+ void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
+ adminZkClient.deleteTopic(topic);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java
new file mode 100644
index 0000000..6e95a54
--- /dev/null
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Kafka Hadoop InputSplit Test.
+ */
+public class KafkaInputSplitTest {
+ private final KafkaInputSplit expectedInputSplit;
+
+ public KafkaInputSplitTest() {
+ String topic = "my_topic";
+ this.expectedInputSplit = new KafkaInputSplit(topic, 1, 50L, 56L, new Path("/tmp"));
+ }
+
+ @Test public void testWriteRead() throws IOException {
+ DataOutputBuffer output = new DataOutputBuffer();
+ this.expectedInputSplit.write(output);
+ KafkaInputSplit kafkaInputSplit = new KafkaInputSplit();
+ DataInputBuffer input = new DataInputBuffer();
+ input.reset(output.getData(), 0, output.getLength());
+ kafkaInputSplit.readFields(input);
+ Assert.assertEquals(this.expectedInputSplit, kafkaInputSplit);
+ }
+
+ @Test public void andRangeOverLapping() {
+ KafkaInputSplit kafkaInputSplit = new KafkaInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
+
+ KafkaInputSplit kafkaInputSplit2 = new KafkaInputSplit("test-topic", 2, 3, 200, new Path("/tmp"));
+
+ Assert.assertEquals(new KafkaInputSplit("test-topic", 2, 10, 200, new Path("/tmp")),
+ KafkaInputSplit.intersectRange(kafkaInputSplit, kafkaInputSplit2));
+
+ }
+
+ @Test public void andRangeNonOverLapping() {
+ KafkaInputSplit kafkaInputSplit = new KafkaInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
+
+ KafkaInputSplit kafkaInputSplit2 =
+ new KafkaInputSplit("test-topic", 2, 550, 700, new Path("/tmp"));
+
+ Assert.assertNull(KafkaInputSplit.intersectRange(kafkaInputSplit, kafkaInputSplit2));
+
+ }
+
+ @Test public void orRange() {
+ KafkaInputSplit kafkaInputSplit =
+ new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+ KafkaInputSplit kafkaInputSplit2 = new KafkaInputSplit("test-topic", 2, 3, 600, new Path("/tmp"));
+
+ Assert.assertEquals(kafkaInputSplit2,
+ KafkaInputSplit.unionRange(kafkaInputSplit, kafkaInputSplit2));
+
+ KafkaInputSplit kafkaInputSplit3 =
+ new KafkaInputSplit("test-topic", 2, 700, 6000, new Path("/tmp"));
+
+ Assert.assertEquals(new KafkaInputSplit("test-topic", 2, 300, 6000, new Path("/tmp")),
+ KafkaInputSplit.unionRange(kafkaInputSplit, kafkaInputSplit3));
+ }
+
+ @Test public void copyOf() {
+ KafkaInputSplit kafkaInputSplit =
+ new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+ KafkaInputSplit copyOf = KafkaInputSplit.copyOf(kafkaInputSplit);
+ Assert.assertEquals(kafkaInputSplit, copyOf);
+ Assert.assertNotSame(kafkaInputSplit, copyOf);
+ }
+
+ @Test public void testClone() {
+ KafkaInputSplit kafkaInputSplit =
+ new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+ KafkaInputSplit clone = KafkaInputSplit.copyOf(kafkaInputSplit);
+ Assert.assertEquals(kafkaInputSplit, clone);
+ Assert.assertNotSame(clone, kafkaInputSplit);
+
+ }
+
+ @Test public void testSlice() {
+ KafkaInputSplit kafkaInputSplit =
+ new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+ List<KafkaInputSplit> kafkaInputSplitList = KafkaInputSplit.slice(14, kafkaInputSplit);
+ Assert.assertEquals(kafkaInputSplitList.stream()
+ .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset()
+ - kafkaPullerInputSplit1.getStartOffset())
+ .sum(), kafkaInputSplit.getEndOffset() - kafkaInputSplit.getStartOffset());
+ Assert.assertEquals(1,
+ kafkaInputSplitList.stream()
+ .filter(kafkaPullerInputSplit1 -> kafkaInputSplit.getStartOffset()
+ == kafkaPullerInputSplit1.getStartOffset())
+ .count());
+ Assert.assertEquals(1,
+ kafkaInputSplitList.stream()
+ .filter(kafkaPullerInputSplit1 -> kafkaInputSplit.getEndOffset()
+ == kafkaPullerInputSplit1.getEndOffset())
+ .count());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
deleted file mode 100644
index 00f95ca..0000000
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Kafka Hadoop InputSplit Test.
- */
-public class KafkaPullerInputSplitTest {
- private KafkaPullerInputSplit expectedInputSplit;
-
- public KafkaPullerInputSplitTest() {
- String topic = "my_topic";
- this.expectedInputSplit = new KafkaPullerInputSplit(topic, 1, 50L, 56L, new Path("/tmp"));
- }
-
- @Test public void testWriteRead() throws IOException {
- DataOutput output = new DataOutputBuffer();
- this.expectedInputSplit.write(output);
- KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit();
- DataInput input = new DataInputBuffer();
- ((DataInputBuffer) input).reset(((DataOutputBuffer) output).getData(), 0, ((DataOutputBuffer) output).getLength());
- kafkaPullerInputSplit.readFields(input);
- Assert.assertEquals(this.expectedInputSplit, kafkaPullerInputSplit);
- }
-
- @Test public void andRangeOverLapping() {
- KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
-
- KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit("test-topic", 2, 3, 200, new Path("/tmp"));
-
- Assert.assertEquals(new KafkaPullerInputSplit("test-topic", 2, 10, 200, new Path("/tmp")),
- KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
-
- }
-
- @Test public void andRangeNonOverLapping() {
- KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
-
- KafkaPullerInputSplit
- kafkaPullerInputSplit2 =
- new KafkaPullerInputSplit("test-topic", 2, 550, 700, new Path("/tmp"));
-
- Assert.assertEquals(null, KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
-
- }
-
- @Test public void orRange() {
- KafkaPullerInputSplit
- kafkaPullerInputSplit =
- new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-
- KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit("test-topic", 2, 3, 600, new Path("/tmp"));
-
- Assert.assertEquals(kafkaPullerInputSplit2,
- KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
-
- KafkaPullerInputSplit
- kafkaPullerInputSplit3 =
- new KafkaPullerInputSplit("test-topic", 2, 700, 6000, new Path("/tmp"));
-
- Assert.assertEquals(new KafkaPullerInputSplit("test-topic", 2, 300, 6000, new Path("/tmp")),
- KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit3));
- }
-
- @Test public void copyOf() {
- KafkaPullerInputSplit
- kafkaPullerInputSplit =
- new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-
- KafkaPullerInputSplit copyOf = KafkaPullerInputSplit.copyOf(kafkaPullerInputSplit);
- Assert.assertEquals(kafkaPullerInputSplit, copyOf);
- Assert.assertTrue(kafkaPullerInputSplit != copyOf);
- }
-
- @Test public void testClone() {
- KafkaPullerInputSplit
- kafkaPullerInputSplit =
- new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-
- KafkaPullerInputSplit clone = kafkaPullerInputSplit.clone();
- Assert.assertEquals(kafkaPullerInputSplit, clone);
- Assert.assertTrue(clone != kafkaPullerInputSplit);
-
- }
-
- @Test public void testSlice() {
- KafkaPullerInputSplit
- kafkaPullerInputSplit =
- new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
- List<KafkaPullerInputSplit> kafkaPullerInputSplitList = KafkaPullerInputSplit.slice(14, kafkaPullerInputSplit);
- Assert.assertEquals(kafkaPullerInputSplitList.stream()
- .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset()
- - kafkaPullerInputSplit1.getStartOffset())
- .sum(), kafkaPullerInputSplit.getEndOffset() - kafkaPullerInputSplit.getStartOffset());
- Assert.assertTrue(kafkaPullerInputSplitList.stream()
- .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getStartOffset()
- == kafkaPullerInputSplit1.getStartOffset())
- .count() == 1);
- Assert.assertTrue(kafkaPullerInputSplitList.stream()
- .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getEndOffset() == kafkaPullerInputSplit1.getEndOffset())
- .count() == 1);
-
- }
-}