You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/02 03:49:11 UTC

[06/25] hive git commit: HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates)

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
new file mode 100644
index 0000000..9aab346
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
@@ -0,0 +1,83 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */
+class CreatePartitionHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final String databaseName;
+  private final String tableName;
+
+  CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) {
+    this.metaStoreClient = metaStoreClient;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+  }
+
+  /** Returns the expected {@link Path} for a given partition value. */
+  Path getPathForPartition(List<String> newPartitionValues) throws WorkerException {
+    try {
+      String location;
+      if (newPartitionValues.isEmpty()) {
+        location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation();
+      } else {
+        location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation();
+      }
+      LOG.debug("Found path {} for partition {}", location, newPartitionValues);
+      return new Path(location);
+    } catch (NoSuchObjectException e) {
+      throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e);
+    } catch (TException e) {
+      throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
+          + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
+    }
+  }
+
+  /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
+  void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
+    if (newPartitionValues.isEmpty()) {
+      return;
+    }
+
+    try {
+      LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
+      Table table = metaStoreClient.getTable(databaseName, tableName);
+
+      Partition partition = new Partition();
+      partition.setDbName(table.getDbName());
+      partition.setTableName(table.getTableName());
+      StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+      partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+          + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
+      partition.setSd(partitionSd);
+      partition.setValues(newPartitionValues);
+
+      metaStoreClient.add_partition(partition);
+    } catch (AlreadyExistsException e) {
+      LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
+    } catch (NoSuchObjectException e) {
+      LOG.error("Failed to create partition : " + newPartitionValues, e);
+      throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e);
+    } catch (TException e) {
+      LOG.error("Failed to create partition : " + newPartitionValues, e);
+      throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '"
+          + databaseName + "." + tableName + "'", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java
new file mode 100644
index 0000000..f8e46d6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class GroupRevisitedException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  GroupRevisitedException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java
new file mode 100644
index 0000000..8ae3904
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java
@@ -0,0 +1,74 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Tracks the (partition, bucket) combinations that have been encountered, checking that a group is not revisited.
+ * Potentially memory intensive.
+ */
+class GroupingValidator {
+
+  private final Map<String, Set<Integer>> visited;
+  private final StringBuffer partitionKeyBuilder;
+  private long groups;
+  private String lastPartitionKey;
+  private int lastBucketId = -1;
+
+  GroupingValidator() {
+    visited = new HashMap<String, Set<Integer>>();
+    partitionKeyBuilder = new StringBuffer(64);
+  }
+
+  /**
+   * Checks that this group is either the same as the last or is a new group.
+   */
+  boolean isInSequence(List<String> partitionValues, int bucketId) {
+    String partitionKey = getPartitionKey(partitionValues);
+    if (Objects.equals(lastPartitionKey, partitionKey) && lastBucketId == bucketId) {
+      return true;
+    }
+    lastPartitionKey = partitionKey;
+    lastBucketId = bucketId;
+
+    Set<Integer> bucketIdSet = visited.get(partitionKey);
+    if (bucketIdSet == null) {
+      // If the bucket id set component of this data structure proves to be too large there is the
+      // option of moving it to Trove or HPPC in an effort to reduce size.
+      bucketIdSet = new HashSet<>();
+      visited.put(partitionKey, bucketIdSet);
+    }
+
+    boolean newGroup = bucketIdSet.add(bucketId);
+    if (newGroup) {
+      groups++;
+    }
+    return newGroup;
+  }
+
+  private String getPartitionKey(List<String> partitionValues) {
+    partitionKeyBuilder.setLength(0);
+    boolean first = true;
+    for (String element : partitionValues) {
+      if (first) {
+        first = false;
+      } else {
+        partitionKeyBuilder.append('/');
+      }
+      partitionKeyBuilder.append(element);
+    }
+    String partitionKey = partitionKeyBuilder.toString();
+    return partitionKey;
+  }
+
+  @Override
+  public String toString() {
+    return "GroupingValidator [groups=" + groups + ",lastPartitionKey=" + lastPartitionKey + ",lastBucketId="
+        + lastBucketId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
new file mode 100644
index 0000000..96ecce9
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
@@ -0,0 +1,21 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+/**
+ * Interface for submitting mutation events to a given partition and bucket in an ACID table. Requires records to arrive
+ * in the order defined by the {@link SequenceValidator}.
+ */
+public interface Mutator extends Closeable, Flushable {
+
+  void insert(Object record) throws IOException;
+
+  void update(Object record) throws IOException;
+
+  void delete(Object record) throws IOException;
+
+  void flush() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
new file mode 100644
index 0000000..96f05e5
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -0,0 +1,281 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Orchestrates the application of an ordered sequence of mutation events to a given ACID table. Events must be grouped
+ * by partition, then bucket and ordered by origTxnId, then rowId. Ordering is enforced by the {@link SequenceValidator}
+ * and grouping is by the {@link GroupingValidator}. An acid delta file is created for each combination partition, and
+ * bucket id (a single transaction id is implied). Once a delta file has been closed it cannot be reopened. Therefore
+ * care is needed as to group the data correctly otherwise failures will occur if a delta belonging to group has been
+ * previously closed. The {@link MutatorCoordinator} will seamlessly handle transitions between groups, creating and
+ * closing {@link Mutator Mutators} as needed to write to the appropriate partition and bucket. New partitions will be
+ * created in the meta store if {@link AcidTable#createPartitions()} is set.
+ * <p/>
+ * {@link #insert(List, Object) Insert} events must be artificially assigned appropriate bucket ids in the preceding
+ * grouping phase so that they are grouped correctly. Note that any transaction id or row id assigned to the
+ * {@link RecordIdentifier RecordIdentifier} of such events will be ignored by both the coordinator and the underlying
+ * {@link RecordUpdater}.
+ */
+public class MutatorCoordinator implements Closeable, Flushable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final MutatorFactory mutatorFactory;
+  private final GroupingValidator groupingValidator;
+  private final SequenceValidator sequenceValidator;
+  private final AcidTable table;
+  private final RecordInspector recordInspector;
+  private final CreatePartitionHelper partitionHelper;
+  private final AcidOutputFormat<?, ?> outputFormat;
+  private final BucketIdResolver bucketIdResolver;
+  private final HiveConf configuration;
+  private final boolean deleteDeltaIfExists;
+
+  private int bucketId;
+  private List<String> partitionValues;
+  private Path partitionPath;
+  private Mutator mutator;
+
+  MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+      AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
+    this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient,
+        table.getDatabaseName(), table.getTableName()), new GroupingValidator(), new SequenceValidator(), table,
+        deleteDeltaIfExists);
+  }
+
+  /** Visible for testing only. */
+  MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+      CreatePartitionHelper partitionHelper, GroupingValidator groupingValidator, SequenceValidator sequenceValidator,
+      AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
+    this.metaStoreClient = metaStoreClient;
+    this.configuration = configuration;
+    this.mutatorFactory = mutatorFactory;
+    this.partitionHelper = partitionHelper;
+    this.groupingValidator = groupingValidator;
+    this.sequenceValidator = sequenceValidator;
+    this.table = table;
+    this.deleteDeltaIfExists = deleteDeltaIfExists;
+    this.recordInspector = this.mutatorFactory.newRecordInspector();
+    bucketIdResolver = this.mutatorFactory.newBucketIdResolver(table.getTotalBuckets());
+
+    bucketId = -1;
+    outputFormat = createOutputFormat(table.getOutputFormatName(), configuration);
+  }
+
+  /**
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * 
+   * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
+   *           using the values in the record's bucketed columns.
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   *           sequence.
+   * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
+   *           been closed.
+   * @throws PartitionCreationException Could not create a new partition in the meta store.
+   * @throws WorkerException
+   */
+  public void insert(List<String> partitionValues, Object record) throws WorkerException {
+    reconfigureState(OperationType.INSERT, partitionValues, record);
+    try {
+      mutator.insert(record);
+      LOG.debug("Inserted into partition={}, record={}", partitionValues, record);
+    } catch (IOException e) {
+      throw new WorkerException("Failed to insert record '" + record + " using mutator '" + mutator + "'.", e);
+    }
+  }
+
+  /**
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * 
+   * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
+   *           using the values in the record's bucketed columns.
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   *           sequence.
+   * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
+   *           been closed.
+   * @throws PartitionCreationException Could not create a new partition in the meta store.
+   * @throws WorkerException
+   */
+  public void update(List<String> partitionValues, Object record) throws WorkerException {
+    reconfigureState(OperationType.UPDATE, partitionValues, record);
+    try {
+      mutator.update(record);
+      LOG.debug("Updated in partition={}, record={}", partitionValues, record);
+    } catch (IOException e) {
+      throw new WorkerException("Failed to update record '" + record + " using mutator '" + mutator + "'.", e);
+    }
+  }
+
+  /**
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * 
+   * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
+   *           using the values in the record's bucketed columns.
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   *           sequence.
+   * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
+   *           been closed.
+   * @throws PartitionCreationException Could not create a new partition in the meta store.
+   * @throws WorkerException
+   */
+  public void delete(List<String> partitionValues, Object record) throws WorkerException {
+    reconfigureState(OperationType.DELETE, partitionValues, record);
+    try {
+      mutator.delete(record);
+      LOG.debug("Deleted from partition={}, record={}", partitionValues, record);
+    } catch (IOException e) {
+      throw new WorkerException("Failed to delete record '" + record + " using mutator '" + mutator + "'.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (mutator != null) {
+        mutator.close();
+      }
+    } finally {
+      metaStoreClient.close();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (mutator != null) {
+      mutator.flush();
+    }
+  }
+
+  private void reconfigureState(OperationType operationType, List<String> newPartitionValues, Object record)
+    throws WorkerException {
+    RecordIdentifier newRecordIdentifier = extractRecordIdentifier(operationType, newPartitionValues, record);
+    int newBucketId = newRecordIdentifier.getBucketId();
+
+    if (newPartitionValues == null) {
+      newPartitionValues = Collections.emptyList();
+    }
+
+    try {
+      if (partitionHasChanged(newPartitionValues)) {
+        if (table.createPartitions()) {
+          partitionHelper.createPartitionIfNotExists(newPartitionValues);
+        }
+        Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues);
+        resetMutator(newBucketId, newPartitionValues, newPartitionPath);
+      } else if (bucketIdHasChanged(newBucketId)) {
+        resetMutator(newBucketId, partitionValues, partitionPath);
+      } else {
+        validateRecordSequence(operationType, newRecordIdentifier);
+      }
+    } catch (IOException e) {
+      throw new WorkerException("Failed to reset mutator when performing " + operationType + " of record: " + record, e);
+    }
+  }
+
+  private RecordIdentifier extractRecordIdentifier(OperationType operationType, List<String> newPartitionValues,
+      Object record) throws BucketIdException {
+    RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record);
+    int computedBucketId = bucketIdResolver.computeBucketId(record);
+    if (operationType != OperationType.DELETE && recordIdentifier.getBucketId() != computedBucketId) {
+      throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId
+          + ") for record " + recordIdentifier + " in partition " + newPartitionValues + ".");
+    }
+    return recordIdentifier;
+  }
+
+  private void resetMutator(int newBucketId, List<String> newPartitionValues, Path newPartitionPath)
+    throws IOException, GroupRevisitedException {
+    if (mutator != null) {
+      mutator.close();
+    }
+    validateGrouping(newPartitionValues, newBucketId);
+    sequenceValidator.reset();
+    if (deleteDeltaIfExists) {
+      // TODO: Should this be the concern of the mutator?
+      deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId);
+    }
+    mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId);
+    bucketId = newBucketId;
+    partitionValues = newPartitionValues;
+    partitionPath = newPartitionPath;
+    LOG.debug("Reset mutator: bucketId={}, partition={}, partitionPath={}", bucketId, partitionValues, partitionPath);
+  }
+
+  private boolean partitionHasChanged(List<String> newPartitionValues) {
+    boolean partitionHasChanged = !Objects.equals(this.partitionValues, newPartitionValues);
+    if (partitionHasChanged) {
+      LOG.debug("Partition changed from={}, to={}", this.partitionValues, newPartitionValues);
+    }
+    return partitionHasChanged;
+  }
+
+  private boolean bucketIdHasChanged(int newBucketId) {
+    boolean bucketIdHasChanged = this.bucketId != newBucketId;
+    if (bucketIdHasChanged) {
+      LOG.debug("Bucket ID changed from={}, to={}", this.bucketId, newBucketId);
+    }
+    return bucketIdHasChanged;
+  }
+
+  private void validateGrouping(List<String> newPartitionValues, int newBucketId) throws GroupRevisitedException {
+    if (!groupingValidator.isInSequence(newPartitionValues, bucketId)) {
+      throw new GroupRevisitedException("Group out of sequence: state=" + groupingValidator + ", partition="
+          + newPartitionValues + ", bucketId=" + newBucketId);
+    }
+  }
+
+  private void validateRecordSequence(OperationType operationType, RecordIdentifier newRecordIdentifier)
+    throws RecordSequenceException {
+    boolean identiferOutOfSequence = operationType != OperationType.INSERT
+        && !sequenceValidator.isInSequence(newRecordIdentifier);
+    if (identiferOutOfSequence) {
+      throw new RecordSequenceException("Records not in sequence: state=" + sequenceValidator + ", recordIdentifier="
+          + newRecordIdentifier);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private AcidOutputFormat<?, ?> createOutputFormat(String outputFormatName, HiveConf configuration)
+    throws WorkerException {
+    try {
+      return (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outputFormatName), configuration);
+    } catch (ClassNotFoundException e) {
+      throw new WorkerException("Could not locate class for '" + outputFormatName + "'.", e);
+    }
+  }
+
+  private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException {
+    Path deltaPath = AcidUtils.createFilename(partitionPath,
+        new AcidOutputFormat.Options(configuration)
+            .bucket(bucketId)
+            .minimumTransactionId(transactionId)
+            .maximumTransactionId(transactionId));
+    FileSystem fileSystem = deltaPath.getFileSystem(configuration);
+    if (fileSystem.exists(deltaPath)) {
+      LOG.info("Deleting existing delta path: {}", deltaPath);
+      fileSystem.delete(deltaPath, false);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
new file mode 100644
index 0000000..8851ea6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
@@ -0,0 +1,76 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory;
+import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+
+/** Convenience class for building {@link MutatorCoordinator} instances. */
+public class MutatorCoordinatorBuilder {
+
+  private HiveConf configuration;
+  private MutatorFactory mutatorFactory;
+  private UserGroupInformation authenticatedUser;
+  private String metaStoreUri;
+  private AcidTable table;
+  private boolean deleteDeltaIfExists;
+
+  public MutatorCoordinatorBuilder configuration(HiveConf configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
+  public MutatorCoordinatorBuilder authenticatedUser(UserGroupInformation authenticatedUser) {
+    this.authenticatedUser = authenticatedUser;
+    return this;
+  }
+
+  public MutatorCoordinatorBuilder metaStoreUri(String metaStoreUri) {
+    this.metaStoreUri = metaStoreUri;
+    return this;
+  }
+
+  /** Set the destination ACID table for this client. */
+  public MutatorCoordinatorBuilder table(AcidTable table) {
+    this.table = table;
+    return this;
+  }
+
+  /**
+   * If the delta file already exists, delete it. THis is useful in a MapReduce setting where a number of task retries
+   * will attempt to write the same delta file.
+   */
+  public MutatorCoordinatorBuilder deleteDeltaIfExists() {
+    this.deleteDeltaIfExists = true;
+    return this;
+  }
+
+  public MutatorCoordinatorBuilder mutatorFactory(MutatorFactory mutatorFactory) {
+    this.mutatorFactory = mutatorFactory;
+    return this;
+  }
+
+  public MutatorCoordinator build() throws WorkerException, MetaException {
+    String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
+    boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
+
+    configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri);
+
+    IMetaStoreClient metaStoreClient;
+    try {
+      metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode)
+          .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+    } catch (IOException e) {
+      throw new WorkerException("Could not create meta store client.", e);
+    }
+
+    return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, table, deleteDeltaIfExists);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
new file mode 100644
index 0000000..850054f
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
@@ -0,0 +1,16 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+
+public interface MutatorFactory {
+
+  Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException;
+  
+  RecordInspector newRecordInspector();
+  
+  BucketIdResolver newBucketIdResolver(int totalBuckets);
+  
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
new file mode 100644
index 0000000..0fe41d5
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
@@ -0,0 +1,84 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */
+public class MutatorImpl implements Mutator {
+
+  private final long transactionId;
+  private final Path partitionPath;
+  private final int bucketId;
+  private final Configuration configuration;
+  private final int recordIdColumn;
+  private final ObjectInspector objectInspector;
+  private RecordUpdater updater;
+
+  public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector,
+      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException {
+    this.configuration = configuration;
+    this.recordIdColumn = recordIdColumn;
+    this.objectInspector = objectInspector;
+    this.transactionId = transactionId;
+    this.partitionPath = partitionPath;
+    this.bucketId = bucketId;
+
+    updater = createRecordUpdater(outputFormat);
+  }
+
+  @Override
+  public void insert(Object record) throws IOException {
+    updater.insert(transactionId, record);
+  }
+
+  @Override
+  public void update(Object record) throws IOException {
+    updater.update(transactionId, record);
+  }
+
+  @Override
+  public void delete(Object record) throws IOException {
+    updater.delete(transactionId, record);
+  }
+
+  /**
+   * This implementation does intentionally nothing at this time. We only use a single transaction and
+   * {@link OrcRecordUpdater#flush()} will purposefully throw and exception in this instance. We keep this here in the
+   * event that we support multiple transactions and to make it clear that the omission of an invocation of
+   * {@link OrcRecordUpdater#flush()} was not a mistake.
+   */
+  @Override
+  public void flush() throws IOException {
+    // Intentionally do nothing
+  }
+
+  @Override
+  public void close() throws IOException {
+    updater.close(false);
+    updater = null;
+  }
+
+  @Override
+  public String toString() {
+    return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath
+        + ", bucketId=" + bucketId + "]";
+  }
+
+  protected RecordUpdater createRecordUpdater(AcidOutputFormat<?, ?> outputFormat) throws IOException {
+    return outputFormat.getRecordUpdater(
+        partitionPath,
+        new AcidOutputFormat.Options(configuration)
+            .inspector(objectInspector)
+            .bucket(bucketId)
+            .minimumTransactionId(transactionId)
+            .maximumTransactionId(transactionId)
+            .recordIdColumn(recordIdColumn));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
new file mode 100644
index 0000000..5ecb1bb
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
@@ -0,0 +1,7 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+enum OperationType {
+  INSERT,
+  UPDATE,
+  DELETE;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java
new file mode 100644
index 0000000..5b59e01
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class PartitionCreationException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  PartitionCreationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  PartitionCreationException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
new file mode 100644
index 0000000..11ef0dd
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+
+/** Provide a means to extract {@link RecordIdentifier} from record objects. */
+public interface RecordInspector {
+
+  /** Get the {@link RecordIdentifier} from the record - to be used for updates and deletes only. */
+  RecordIdentifier extractRecordIdentifier(Object record);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
new file mode 100644
index 0000000..18ee458
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
@@ -0,0 +1,45 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Standard {@link RecordInspector} implementation that uses the supplied {@link ObjectInspector} and
+ * {@link AcidOutputFormat.Options#recordIdColumn(int) record id column} to extract {@link RecordIdentifier
+ * RecordIdentifiers}, and calculate bucket ids from records.
+ */
+public class RecordInspectorImpl implements RecordInspector {
+
+  private final StructObjectInspector structObjectInspector;
+  private final StructField recordIdentifierField;
+
+  /**
+   * Note that all column indexes are with respect to your record structure, not the Hive table structure.
+   */
+  public RecordInspectorImpl(ObjectInspector objectInspector, int recordIdColumn) {
+    if (!(objectInspector instanceof StructObjectInspector)) {
+      throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a "
+          + objectInspector.getClass().getName());
+    }
+
+    structObjectInspector = (StructObjectInspector) objectInspector;
+    List<? extends StructField> structFields = structObjectInspector.getAllStructFieldRefs();
+    recordIdentifierField = structFields.get(recordIdColumn);
+  }
+
+  public RecordIdentifier extractRecordIdentifier(Object record) {
+    return (RecordIdentifier) structObjectInspector.getStructFieldData(record, recordIdentifierField);
+  }
+
+  @Override
+  public String toString() {
+    return "RecordInspectorImpl [structObjectInspector=" + structObjectInspector + ", recordIdentifierField="
+        + recordIdentifierField + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java
new file mode 100644
index 0000000..6b034f1
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class RecordSequenceException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  RecordSequenceException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
new file mode 100644
index 0000000..bcff4d6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
@@ -0,0 +1,49 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Verifies that the sequence of {@link RecordIdentifier RecordIdentifiers} are in a valid order for insertion into an
+ * ACID delta file in a given partition and bucket.
+ */
+class SequenceValidator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class);
+
+  private Long lastTxId;
+  private Long lastRowId;
+
+  SequenceValidator() {
+  }
+
+  boolean isInSequence(RecordIdentifier recordIdentifier) {
+    if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) {
+      LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier);
+      return false;
+    } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null
+        && recordIdentifier.getRowId() <= lastRowId) {
+      LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier);
+      return false;
+    }
+    lastTxId = recordIdentifier.getTransactionId();
+    lastRowId = recordIdentifier.getRowId();
+    return true;
+  }
+
+  /**
+   * Validator must be reset for each new partition and or bucket.
+   */
+  void reset() {
+    lastTxId = null;
+    lastRowId = null;
+    LOG.debug("reset");
+  }
+
+  @Override
+  public String toString() {
+    return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java
new file mode 100644
index 0000000..1fa1998
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class WorkerException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  WorkerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  WorkerException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
new file mode 100644
index 0000000..86d70d4
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
@@ -0,0 +1,82 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.util.List;
+
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+
+public class ExampleUseCase {
+
+  private String metaStoreUri;
+  private String databaseName;
+  private String tableName;
+  private boolean createPartitions = true;
+  private List<String> partitionValues1, partitionValues2, partitionValues3;
+  private Object record1, record2, record3;
+  private MutatorFactory mutatorFactory;
+
+  /* This is an illustration, not a functioning example. */ 
+  public void example() throws Exception {
+    // CLIENT/TOOL END
+    //
+    // Singleton instance in the job client
+
+    // Create a client to manage our transaction
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(databaseName, tableName, createPartitions)
+        .metaStoreUri(metaStoreUri)
+        .build();
+
+    // Get the transaction
+    Transaction transaction = client.newTransaction();
+
+    // Get serializable details of the destination tables
+    List<AcidTable> tables = client.getTables();
+
+    transaction.begin();
+
+    // CLUSTER / WORKER END
+    //
+    // Job submitted to the cluster
+    // 
+
+    BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(tables.get(0).getTotalBuckets());
+    record1 = bucketIdResolver.attachBucketIdToRecord(record1);
+
+    // --------------------------------------------------------------
+    // DATA SHOULD GET SORTED BY YOUR ETL/MERGE PROCESS HERE
+    //
+    // Group the data by (partitionValues, ROW__ID.bucketId)
+    // Order the groups by (ROW__ID.lastTransactionId, ROW__ID.rowId)
+    // --------------------------------------------------------------
+    
+    // One of these runs at the output of each reducer
+    //
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(tables.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+    
+    coordinator.insert(partitionValues1, record1);
+    coordinator.update(partitionValues2, record2);
+    coordinator.delete(partitionValues3, record3);
+
+    coordinator.close();
+
+    // CLIENT/TOOL END
+    //
+    // The tasks have completed, control is back at the tool
+
+    transaction.commit();
+
+    client.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
new file mode 100644
index 0000000..0d87a31
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.io.Text;
+
+public class MutableRecord {
+
+  // Column 0
+  public final int id;
+  // Column 1
+  public final Text msg;
+  // Column 2
+  public RecordIdentifier rowId;
+
+  public MutableRecord(int id, String msg, RecordIdentifier rowId) {
+    this.id = id;
+    this.msg = new Text(msg);
+    this.rowId = rowId;
+  }
+
+  public MutableRecord(int id, String msg) {
+    this.id = id;
+    this.msg = new Text(msg);
+    rowId = null;
+  }
+
+  @Override
+  public String toString() {
+    return "MutableRecord [id=" + id + ", msg=" + msg + ", rowId=" + rowId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
new file mode 100644
index 0000000..2a851c8
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
@@ -0,0 +1,51 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolverImpl;
+import org.apache.hive.hcatalog.streaming.mutate.worker.Mutator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorImpl;
+import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspector;
+import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspectorImpl;
+
+public class ReflectiveMutatorFactory implements MutatorFactory {
+
+  private final int recordIdColumn;
+  private final ObjectInspector objectInspector;
+  private final Configuration configuration;
+  private final int[] bucketColumnIndexes;
+
+  public ReflectiveMutatorFactory(Configuration configuration, Class<?> recordClass, int recordIdColumn,
+      int[] bucketColumnIndexes) {
+    this.configuration = configuration;
+    this.recordIdColumn = recordIdColumn;
+    this.bucketColumnIndexes = bucketColumnIndexes;
+    objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(recordClass,
+        ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+  }
+
+  @Override
+  public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId)
+    throws IOException {
+    return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath,
+        bucketId);
+  }
+
+  @Override
+  public RecordInspector newRecordInspector() {
+    return new RecordInspectorImpl(objectInspector, recordIdColumn);
+  }
+
+  @Override
+  public BucketIdResolver newBucketIdResolver(int totalBuckets) {
+    return new BucketIdResolverImpl(objectInspector, recordIdColumn, totalBuckets, bucketColumnIndexes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
new file mode 100644
index 0000000..477ed8c
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+
+public class StreamingAssert {
+
+  public static class Factory {
+    private IMetaStoreClient metaStoreClient;
+    private final HiveConf conf;
+
+    public Factory(IMetaStoreClient metaStoreClient, HiveConf conf) {
+      this.metaStoreClient = metaStoreClient;
+      this.conf = conf;
+    }
+
+    public StreamingAssert newStreamingAssert(Table table) throws Exception {
+      return newStreamingAssert(table, Collections.<String> emptyList());
+    }
+
+    public StreamingAssert newStreamingAssert(Table table, List<String> partition) throws Exception {
+      return new StreamingAssert(metaStoreClient, conf, table, partition);
+    }
+  }
+
+  private Table table;
+  private List<String> partition;
+  private IMetaStoreClient metaStoreClient;
+  private Directory dir;
+  private ValidTxnList txns;
+  private List<AcidUtils.ParsedDelta> currentDeltas;
+  private long min;
+  private long max;
+  private Path partitionLocation;
+
+  StreamingAssert(IMetaStoreClient metaStoreClient, HiveConf conf, Table table, List<String> partition)
+      throws Exception {
+    this.metaStoreClient = metaStoreClient;
+    this.table = table;
+    this.partition = partition;
+
+    txns = metaStoreClient.getValidTxns();
+    partitionLocation = getPartitionLocation();
+    dir = AcidUtils.getAcidState(partitionLocation, conf, txns);
+    assertEquals(0, dir.getObsolete().size());
+    assertEquals(0, dir.getOriginalFiles().size());
+
+    currentDeltas = dir.getCurrentDirectories();
+    min = Long.MAX_VALUE;
+    max = Long.MIN_VALUE;
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) {
+      System.out.println(parsedDelta.getPath().toString());
+      max = Math.max(parsedDelta.getMaxTransaction(), max);
+      min = Math.min(parsedDelta.getMinTransaction(), min);
+    }
+  }
+
+  public void assertExpectedFileCount(int expectedFileCount) {
+    assertEquals(expectedFileCount, currentDeltas.size());
+  }
+
+  public void assertNothingWritten() {
+    assertExpectedFileCount(0);
+  }
+
+  public void assertMinTransactionId(long expectedMinTransactionId) {
+    if (currentDeltas.isEmpty()) {
+      throw new AssertionError("No data");
+    }
+    assertEquals(expectedMinTransactionId, min);
+  }
+
+  public void assertMaxTransactionId(long expectedMaxTransactionId) {
+    if (currentDeltas.isEmpty()) {
+      throw new AssertionError("No data");
+    }
+    assertEquals(expectedMaxTransactionId, max);
+  }
+
+  List<Record> readRecords() throws Exception {
+    if (currentDeltas.isEmpty()) {
+      throw new AssertionError("No data");
+    }
+    InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", partitionLocation.toString());
+    job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    InputSplit[] splits = inputFormat.getSplits(job, 1);
+    assertEquals(1, splits.length);
+
+    final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
+        .getRecordReader(splits[0], job, Reporter.NULL);
+
+    NullWritable key = recordReader.createKey();
+    OrcStruct value = recordReader.createValue();
+
+    List<Record> records = new ArrayList<>();
+    while (recordReader.next(key, value)) {
+      RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+      Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+          recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString());
+      System.out.println(record);
+      records.add(record);
+    }
+    recordReader.close();
+    return records;
+  }
+
+  private Path getPartitionLocation() throws NoSuchObjectException, MetaException, TException {
+    Path partitionLocacation;
+    if (partition.isEmpty()) {
+      partitionLocacation = new Path(table.getSd().getLocation());
+    } else {
+      // TODO: calculate this instead. Just because we're writing to the location doesn't mean that it'll
+      // always be wanted in the meta store right away.
+      List<Partition> partitionEntries = metaStoreClient.listPartitions(table.getDbName(), table.getTableName(),
+          partition, (short) 1);
+      partitionLocacation = new Path(partitionEntries.get(0).getSd().getLocation());
+    }
+    return partitionLocacation;
+  }
+
+  public static class Record {
+    private RecordIdentifier recordIdentifier;
+    private String row;
+
+    Record(RecordIdentifier recordIdentifier, String row) {
+      this.recordIdentifier = recordIdentifier;
+      this.row = row;
+    }
+
+    public RecordIdentifier getRecordIdentifier() {
+      return recordIdentifier;
+    }
+
+    public String getRow() {
+      return row;
+    }
+
+    @Override
+    public String toString() {
+      return "Record [recordIdentifier=" + recordIdentifier + ", row=" + row + "]";
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
new file mode 100644
index 0000000..f8c8537
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
@@ -0,0 +1,261 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.thrift.TException;
+
+public class StreamingTestUtils {
+
+  public HiveConf newHiveConf(String metaStoreUri) {
+    HiveConf conf = new HiveConf(this.getClass());
+    conf.set("fs.raw.impl", RawFileSystem.class.getName());
+    if (metaStoreUri != null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    return conf;
+  }
+
+  public void prepareTransactionDatabase(HiveConf conf) throws Exception {
+    TxnDbUtil.setConfValues(conf);
+    TxnDbUtil.cleanDb();
+    TxnDbUtil.prepDb();
+  }
+
+  public IMetaStoreClient newMetaStoreClient(HiveConf conf) throws Exception {
+    return new HiveMetaStoreClient(conf);
+  }
+
+  public static class RawFileSystem extends RawLocalFileSystem {
+    private static final URI NAME;
+    static {
+      try {
+        NAME = new URI("raw:///");
+      } catch (URISyntaxException se) {
+        throw new IllegalArgumentException("bad uri", se);
+      }
+    }
+
+    @Override
+    public URI getUri() {
+      return NAME;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      File file = pathToFile(path);
+      if (!file.exists()) {
+        throw new FileNotFoundException("Can't find " + path);
+      }
+      // get close enough
+      short mod = 0;
+      if (file.canRead()) {
+        mod |= 0444;
+      }
+      if (file.canWrite()) {
+        mod |= 0200;
+      }
+      if (file.canExecute()) {
+        mod |= 0111;
+      }
+      return new FileStatus(file.length(), file.isDirectory(), 1, 1024, file.lastModified(), file.lastModified(),
+          FsPermission.createImmutable(mod), "owen", "users", path);
+    }
+  }
+
+  public static DatabaseBuilder databaseBuilder(File warehouseFolder) {
+    return new DatabaseBuilder(warehouseFolder);
+  }
+
+  public static class DatabaseBuilder {
+
+    private Database database;
+    private File warehouseFolder;
+
+    public DatabaseBuilder(File warehouseFolder) {
+      this.warehouseFolder = warehouseFolder;
+      database = new Database();
+    }
+
+    public DatabaseBuilder name(String name) {
+      database.setName(name);
+      File databaseFolder = new File(warehouseFolder, name + ".db");
+      String databaseLocation = "raw://" + databaseFolder.toURI().getPath();
+      database.setLocationUri(databaseLocation);
+      return this;
+    }
+
+    public Database dropAndCreate(IMetaStoreClient metaStoreClient) throws Exception {
+      if (metaStoreClient == null) {
+        throw new IllegalArgumentException();
+      }
+      try {
+        for (String table : metaStoreClient.listTableNamesByFilter(database.getName(), "", (short) -1)) {
+          metaStoreClient.dropTable(database.getName(), table, true, true);
+        }
+        metaStoreClient.dropDatabase(database.getName());
+      } catch (TException e) {
+      }
+      metaStoreClient.createDatabase(database);
+      return database;
+    }
+
+    public Database build() {
+      return database;
+    }
+
+  }
+
+  public static TableBuilder tableBuilder(Database database) {
+    return new TableBuilder(database);
+  }
+
+  public static class TableBuilder {
+
+    private Table table;
+    private StorageDescriptor sd;
+    private SerDeInfo serDeInfo;
+    private Database database;
+    private List<List<String>> partitions;
+    private List<String> columnNames;
+    private List<String> columnTypes;
+    private List<String> partitionKeys;
+
+    public TableBuilder(Database database) {
+      this.database = database;
+      partitions = new ArrayList<>();
+      columnNames = new ArrayList<>();
+      columnTypes = new ArrayList<>();
+      partitionKeys = Collections.emptyList();
+      table = new Table();
+      table.setDbName(database.getName());
+      table.setTableType(TableType.MANAGED_TABLE.toString());
+      Map<String, String> tableParams = new HashMap<String, String>();
+      tableParams.put("transactional", Boolean.TRUE.toString());
+      table.setParameters(tableParams);
+
+      sd = new StorageDescriptor();
+      sd.setInputFormat(HiveInputFormat.class.getName());
+      sd.setOutputFormat(OrcOutputFormat.class.getName());
+      sd.setNumBuckets(1);
+      table.setSd(sd);
+
+      serDeInfo = new SerDeInfo();
+      serDeInfo.setParameters(new HashMap<String, String>());
+      serDeInfo.getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+      serDeInfo.setSerializationLib(OrcSerde.class.getName());
+      sd.setSerdeInfo(serDeInfo);
+    }
+
+    public TableBuilder name(String name) {
+      sd.setLocation(database.getLocationUri() + Path.SEPARATOR + name);
+      table.setTableName(name);
+      serDeInfo.setName(name);
+      return this;
+    }
+
+    public TableBuilder buckets(int buckets) {
+      sd.setNumBuckets(buckets);
+      return this;
+    }
+
+    public TableBuilder addColumn(String columnName, String columnType) {
+      columnNames.add(columnName);
+      columnTypes.add(columnType);
+      return this;
+    }
+
+    public TableBuilder partitionKeys(String... partitionKeys) {
+      this.partitionKeys = Arrays.asList(partitionKeys);
+      return this;
+    }
+
+    public TableBuilder addPartition(String... partitionValues) {
+      partitions.add(Arrays.asList(partitionValues));
+      return this;
+    }
+
+    public TableBuilder addPartition(List<String> partitionValues) {
+      partitions.add(partitionValues);
+      return this;
+    }
+
+    public Table create(IMetaStoreClient metaStoreClient) throws Exception {
+      if (metaStoreClient == null) {
+        throw new IllegalArgumentException();
+      }
+      return internalCreate(metaStoreClient);
+    }
+
+    public Table build() throws Exception {
+      return internalCreate(null);
+    }
+
+    private Table internalCreate(IMetaStoreClient metaStoreClient) throws Exception {
+      List<FieldSchema> fields = new ArrayList<FieldSchema>(columnNames.size());
+      for (int i = 0; i < columnNames.size(); i++) {
+        fields.add(new FieldSchema(columnNames.get(i), columnTypes.get(i), ""));
+      }
+      sd.setCols(fields);
+
+      if (!partitionKeys.isEmpty()) {
+        List<FieldSchema> partitionFields = new ArrayList<FieldSchema>();
+        for (String partitionKey : partitionKeys) {
+          partitionFields.add(new FieldSchema(partitionKey, serdeConstants.STRING_TYPE_NAME, ""));
+        }
+        table.setPartitionKeys(partitionFields);
+      }
+      if (metaStoreClient != null) {
+        metaStoreClient.createTable(table);
+      }
+
+      for (List<String> partitionValues : partitions) {
+        Partition partition = new Partition();
+        partition.setDbName(database.getName());
+        partition.setTableName(table.getTableName());
+        StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+        partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+            + Warehouse.makePartName(table.getPartitionKeys(), partitionValues));
+        partition.setSd(partitionSd);
+        partition.setValues(partitionValues);
+
+        if (metaStoreClient != null) {
+          metaStoreClient.add_partition(partition);
+        }
+      }
+      return table;
+    }
+  }
+
+}