You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/07/01 00:01:07 UTC

[1/3] hive git commit: HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates)

Repository: hive
Updated Branches:
  refs/heads/master 3991dba30 -> 994d98c09


http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
new file mode 100644
index 0000000..703cef6
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import static org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.ABORTED;
+import static org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.COMMITTED;
+import static org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.databaseBuilder;
+import static org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.tableBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hive.hcatalog.streaming.TestStreaming;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Record;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.TableBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is based on {@link TestStreaming} and has a similar core set of tests to ensure that basic transactional
+ * behaviour is as expected in the {@link RecordMutator} line. This is complemented with a set of tests related to the
+ * use of update and delete operations.
+ */
+public class TestMutations {
+
+  private static final List<String> EUROPE_FRANCE = Arrays.asList("Europe", "France");
+  private static final List<String> EUROPE_UK = Arrays.asList("Europe", "UK");
+  private static final List<String> ASIA_INDIA = Arrays.asList("Asia", "India");
+  // id
+  private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
+  private static final int RECORD_ID_COLUMN = 2;
+
+  @Rule
+  public TemporaryFolder warehouseFolder = new TemporaryFolder();
+
+  private StreamingTestUtils testUtils = new StreamingTestUtils();
+  private HiveConf conf;
+  private IMetaStoreClient metaStoreClient;
+  private String metaStoreUri;
+  private Database database;
+  private TableBuilder partitionedTableBuilder;
+  private TableBuilder unpartitionedTableBuilder;
+  private Factory assertionFactory;
+
+  public TestMutations() throws Exception {
+    conf = testUtils.newHiveConf(metaStoreUri);
+    testUtils.prepareTransactionDatabase(conf);
+    metaStoreClient = testUtils.newMetaStoreClient(conf);
+    assertionFactory = new StreamingAssert.Factory(metaStoreClient, conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    database = databaseBuilder(warehouseFolder.getRoot()).name("testing").dropAndCreate(metaStoreClient);
+
+    partitionedTableBuilder = tableBuilder(database)
+        .name("partitioned")
+        .addColumn("id", "int")
+        .addColumn("msg", "string")
+        .partitionKeys("continent", "country");
+
+    unpartitionedTableBuilder = tableBuilder(database)
+        .name("unpartitioned")
+        .addColumn("id", "int")
+        .addColumn("msg", "string");
+  }
+
+  @Test
+  public void testTransactionBatchEmptyCommitPartitioned() throws Exception {
+    Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    transaction.begin();
+
+    transaction.commit();
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyCommitUnpartitioned() throws Exception {
+    Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), false)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    transaction.begin();
+
+    transaction.commit();
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyAbortPartitioned() throws Exception {
+    Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    coordinator.close();
+
+    transaction.abort();
+    assertThat(transaction.getState(), is(ABORTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyAbortUnartitioned() throws Exception {
+    Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), false)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    coordinator.close();
+
+    transaction.abort();
+    assertThat(transaction.getState(), is(ABORTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommitPartitioned() throws Exception {
+    Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdAppender = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord record = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+    coordinator.insert(ASIA_INDIA, record);
+    coordinator.close();
+
+    transaction.commit();
+
+    StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    List<Record> readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testMulti() throws Exception {
+    Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord asiaIndiaRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+    MutableRecord europeUkRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2,
+        "Hello streaming"));
+    MutableRecord europeFranceRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(3,
+        "Hello streaming"));
+    MutableRecord europeFranceRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(4,
+        "Bonjour streaming"));
+
+    coordinator.insert(ASIA_INDIA, asiaIndiaRecord1);
+    coordinator.insert(EUROPE_UK, europeUkRecord1);
+    coordinator.insert(EUROPE_FRANCE, europeFranceRecord1);
+    coordinator.insert(EUROPE_FRANCE, europeFranceRecord2);
+    coordinator.close();
+
+    transaction.commit();
+
+    // ASIA_INDIA
+    StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    List<Record> readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+    // EUROPE_UK
+    streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+    // EUROPE_FRANCE
+    streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(2));
+    assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}"));
+    assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommitUnpartitioned() throws Exception {
+    Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), false)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord record = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+
+    coordinator.insert(Collections.<String> emptyList(), record);
+    coordinator.close();
+
+    transaction.commit();
+
+    StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    List<Record> readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchAbort() throws Exception {
+    Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord record1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+    MutableRecord record2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2,
+        "Welcome to streaming"));
+
+    coordinator.insert(ASIA_INDIA, record1);
+    coordinator.insert(ASIA_INDIA, record2);
+    coordinator.close();
+
+    transaction.abort();
+
+    assertThat(transaction.getState(), is(ABORTED));
+
+    client.close();
+
+    StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    streamingAssertions.assertNothingWritten();
+  }
+
+  @Test
+  public void testUpdatesAndDeletes() throws Exception {
+    // Set up some base data then stream some inserts/updates/deletes to a number of partitions
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+
+    // INSERT DATA
+    //
+    Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).addPartition(EUROPE_FRANCE).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction insertTransaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    insertTransaction.begin();
+
+    MutatorCoordinator insertCoordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord asiaIndiaRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Namaste streaming 1"));
+    MutableRecord asiaIndiaRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2,
+        "Namaste streaming 2"));
+    MutableRecord europeUkRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(3,
+        "Hello streaming 1"));
+    MutableRecord europeUkRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(4,
+        "Hello streaming 2"));
+    MutableRecord europeFranceRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(5,
+        "Bonjour streaming 1"));
+    MutableRecord europeFranceRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(6,
+        "Bonjour streaming 2"));
+
+    insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord1);
+    insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord2);
+    insertCoordinator.insert(EUROPE_UK, europeUkRecord1);
+    insertCoordinator.insert(EUROPE_UK, europeUkRecord2);
+    insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord1);
+    insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord2);
+    insertCoordinator.close();
+
+    insertTransaction.commit();
+
+    assertThat(insertTransaction.getState(), is(COMMITTED));
+    client.close();
+
+    // MUTATE DATA
+    //
+    client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction mutateTransaction = client.newTransaction();
+
+    destinations = client.getTables();
+
+    mutateTransaction.begin();
+
+    MutatorCoordinator mutateCoordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord asiaIndiaRecord3 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(20,
+        "Namaste streaming 3"));
+
+    mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L,
+        0, 1L)));
+    mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3);
+    mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L)));
+    mutateCoordinator.delete(EUROPE_FRANCE,
+        new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L)));
+    mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier(
+        1L, 0, 1L)));
+    mutateCoordinator.close();
+
+    mutateTransaction.commit();
+
+    assertThat(mutateTransaction.getState(), is(COMMITTED));
+
+    StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    indiaAssertions.assertMinTransactionId(1L);
+    indiaAssertions.assertMaxTransactionId(2L);
+    List<Record> indiaRecords = indiaAssertions.readRecords();
+    assertThat(indiaRecords.size(), is(3));
+    assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
+    assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
+    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+    assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
+    assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0L)));
+
+    StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
+    ukAssertions.assertMinTransactionId(1L);
+    ukAssertions.assertMaxTransactionId(2L);
+    List<Record> ukRecords = ukAssertions.readRecords();
+    assertThat(ukRecords.size(), is(1));
+    assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
+    assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+
+    StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
+    franceAssertions.assertMinTransactionId(1L);
+    franceAssertions.assertMaxTransactionId(2L);
+    List<Record> franceRecords = franceAssertions.readRecords();
+    assertThat(franceRecords.size(), is(1));
+    assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
+    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+
+    client.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
new file mode 100644
index 0000000..706697a
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
@@ -0,0 +1,66 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils;
+import org.junit.Test;
+
+public class TestAcidTableSerializer {
+
+  @Test
+  public void testSerializeDeserialize() throws Exception {
+    Database database = StreamingTestUtils.databaseBuilder(new File("/tmp")).name("db_1").build();
+    Table table = StreamingTestUtils
+        .tableBuilder(database)
+        .name("table_1")
+        .addColumn("one", "string")
+        .addColumn("two", "integer")
+        .partitionKeys("partition")
+        .addPartition("p1")
+        .buckets(10)
+        .build();
+
+    AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK);
+    acidTable.setTable(table);
+    acidTable.setTransactionId(42L);
+
+    String encoded = AcidTableSerializer.encode(acidTable);
+    System.out.println(encoded);
+    AcidTable decoded = AcidTableSerializer.decode(encoded);
+
+    assertThat(decoded.getDatabaseName(), is("db_1"));
+    assertThat(decoded.getTableName(), is("table_1"));
+    assertThat(decoded.createPartitions(), is(true));
+    assertThat(decoded.getOutputFormatName(), is("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"));
+    assertThat(decoded.getTotalBuckets(), is(10));
+    assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
+    assertThat(decoded.getTransactionId(), is(42L));
+    assertThat(decoded.getTableType(), is(TableType.SINK));
+    assertThat(decoded.getTable(), is(table));
+  }
+
+  @Test
+  public void testSerializeDeserializeNoTableNoTransaction() throws Exception {
+    AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK);
+
+    String encoded = AcidTableSerializer.encode(acidTable);
+    AcidTable decoded = AcidTableSerializer.decode(encoded);
+
+    assertThat(decoded.getDatabaseName(), is("db_1"));
+    assertThat(decoded.getTableName(), is("table_1"));
+    assertThat(decoded.createPartitions(), is(true));
+    assertThat(decoded.getOutputFormatName(), is(nullValue()));
+    assertThat(decoded.getTotalBuckets(), is(0));
+    assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
+    assertThat(decoded.getTransactionId(), is(0L));
+    assertThat(decoded.getTableType(), is(TableType.SINK));
+    assertThat(decoded.getTable(), is(nullValue()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
new file mode 100644
index 0000000..ca3f7b2
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
@@ -0,0 +1,176 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorClient {
+
+  private static final long TRANSACTION_ID = 42L;
+  private static final String TABLE_NAME_1 = "TABLE_1";
+  private static final String TABLE_NAME_2 = "TABLE_2";
+  private static final String DB_NAME = "DB_1";
+  private static final String USER = "user";
+  private static final AcidTable TABLE_1 = new AcidTable(DB_NAME, TABLE_NAME_1, true, TableType.SINK);
+  private static final AcidTable TABLE_2 = new AcidTable(DB_NAME, TABLE_NAME_2, true, TableType.SINK);
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private Lock mockLock;
+  @Mock
+  private Table mockTable1, mockTable2;
+  @Mock
+  private StorageDescriptor mockSd;
+  @Mock
+  private Map<String, String> mockParameters;
+  @Mock
+  private HiveConf mockConfiguration;
+  @Mock
+  private LockFailureListener mockLockFailureListener;
+
+  private MutatorClient client;
+
+  @Before
+  public void configureMocks() throws Exception {
+    when(mockMetaStoreClient.getTable(DB_NAME, TABLE_NAME_1)).thenReturn(mockTable1);
+    when(mockTable1.getDbName()).thenReturn(DB_NAME);
+    when(mockTable1.getTableName()).thenReturn(TABLE_NAME_1);
+    when(mockTable1.getSd()).thenReturn(mockSd);
+    when(mockTable1.getParameters()).thenReturn(mockParameters);
+    when(mockMetaStoreClient.getTable(DB_NAME, TABLE_NAME_2)).thenReturn(mockTable2);
+    when(mockTable2.getDbName()).thenReturn(DB_NAME);
+    when(mockTable2.getTableName()).thenReturn(TABLE_NAME_2);
+    when(mockTable2.getSd()).thenReturn(mockSd);
+    when(mockTable2.getParameters()).thenReturn(mockParameters);
+    when(mockSd.getNumBuckets()).thenReturn(1, 2);
+    when(mockSd.getOutputFormat()).thenReturn(OrcOutputFormat.class.getName());
+    when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString());
+
+    when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+
+    client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER,
+        Collections.singletonList(TABLE_1));
+  }
+
+  @Test
+  public void testCheckValidTableConnect() throws Exception {
+    List<AcidTable> inTables = new ArrayList<>();
+    inTables.add(TABLE_1);
+    inTables.add(TABLE_2);
+    client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, inTables);
+
+    client.connect();
+    List<AcidTable> outTables = client.getTables();
+
+    assertThat(client.isConnected(), is(true));
+    assertThat(outTables.size(), is(2));
+    assertThat(outTables.get(0).getDatabaseName(), is(DB_NAME));
+    assertThat(outTables.get(0).getTableName(), is(TABLE_NAME_1));
+    assertThat(outTables.get(0).getTotalBuckets(), is(2));
+    assertThat(outTables.get(0).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
+    assertThat(outTables.get(0).getTransactionId(), is(0L));
+    assertThat(outTables.get(0).getTable(), is(mockTable1));
+    assertThat(outTables.get(1).getDatabaseName(), is(DB_NAME));
+    assertThat(outTables.get(1).getTableName(), is(TABLE_NAME_2));
+    assertThat(outTables.get(1).getTotalBuckets(), is(2));
+    assertThat(outTables.get(1).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
+    assertThat(outTables.get(1).getTransactionId(), is(0L));
+    assertThat(outTables.get(1).getTable(), is(mockTable2));
+  }
+
+  @Test
+  public void testCheckNonTransactionalTableConnect() throws Exception {
+    when(mockParameters.get("transactional")).thenReturn(Boolean.FALSE.toString());
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectionException e) {
+    }
+
+    assertThat(client.isConnected(), is(false));
+  }
+
+  @Test
+  public void testCheckUnBucketedTableConnect() throws Exception {
+    when(mockSd.getNumBuckets()).thenReturn(0);
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectionException e) {
+    }
+
+    assertThat(client.isConnected(), is(false));
+  }
+
+  @Test
+  public void testMetaStoreFailsOnConnect() throws Exception {
+    when(mockMetaStoreClient.getTable(anyString(), anyString())).thenThrow(new TException());
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectionException e) {
+    }
+
+    assertThat(client.isConnected(), is(false));
+  }
+
+  @Test(expected = ConnectionException.class)
+  public void testGetDestinationsFailsIfNotConnected() throws Exception {
+    client.getTables();
+  }
+
+  @Test
+  public void testNewTransaction() throws Exception {
+    List<AcidTable> inTables = new ArrayList<>();
+    inTables.add(TABLE_1);
+    inTables.add(TABLE_2);
+    client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, inTables);
+
+    client.connect();
+    Transaction transaction = client.newTransaction();
+    List<AcidTable> outTables = client.getTables();
+
+    assertThat(client.isConnected(), is(true));
+
+    assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
+    assertThat(transaction.getState(), is(TxnState.INACTIVE));
+    assertThat(outTables.get(0).getTransactionId(), is(TRANSACTION_ID));
+    assertThat(outTables.get(1).getTransactionId(), is(TRANSACTION_ID));
+  }
+
+  @Test
+  public void testCloseClosesClient() throws Exception {
+    client.close();
+    assertThat(client.isConnected(), is(false));
+    verify(mockMetaStoreClient).close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
new file mode 100644
index 0000000..179207a
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
@@ -0,0 +1,95 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestTransaction {
+
+  private static final String USER = "user";
+  private static final long TRANSACTION_ID = 10L;
+
+  @Mock
+  private Lock mockLock;
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+
+  private Transaction transaction;
+
+  @Before
+  public void createTransaction() throws Exception {
+    when(mockLock.getUser()).thenReturn(USER);
+    when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+    transaction = new Transaction(mockMetaStoreClient, mockLock);
+  }
+
+  @Test
+  public void testInitialState() {
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE));
+    assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
+  }
+
+  @Test
+  public void testBegin() throws Exception {
+    transaction.begin();
+
+    verify(mockLock).acquire(TRANSACTION_ID);
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.OPEN));
+  }
+
+  @Test
+  public void testBeginLockFails() throws Exception {
+    doThrow(new LockException("")).when(mockLock).acquire(TRANSACTION_ID);
+
+    try {
+      transaction.begin();
+    } catch (TransactionException ignore) {
+    }
+
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE));
+  }
+
+  @Test
+  public void testCommit() throws Exception {
+    transaction.commit();
+
+    verify(mockLock).release();
+    verify(mockMetaStoreClient).commitTxn(TRANSACTION_ID);
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.COMMITTED));
+  }
+
+  @Test(expected = TransactionException.class)
+  public void testCommitLockFails() throws Exception {
+    doThrow(new LockException("")).when(mockLock).release();
+    transaction.commit();
+  }
+
+  @Test
+  public void testAbort() throws Exception {
+    transaction.abort();
+
+    verify(mockLock).release();
+    verify(mockMetaStoreClient).rollbackTxn(TRANSACTION_ID);
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.ABORTED));
+  }
+
+  @Test(expected = TransactionException.class)
+  public void testAbortLockFails() throws Exception {
+    doThrow(new LockException("")).when(mockLock).release();
+    transaction.abort();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
new file mode 100644
index 0000000..8e6d06e
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
@@ -0,0 +1,100 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestHeartbeatTimerTask {
+
+  private static final long TRANSACTION_ID = 10L;
+  private static final long LOCK_ID = 1L;
+  private static final List<Table> TABLES = createTable();
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private LockFailureListener mockListener;
+
+  private HeartbeatTimerTask task;
+
+  @Before
+  public void create() throws Exception {
+    task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    task.run();
+
+    verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+  }
+
+  @Test
+  public void testRunNullTransactionId() throws Exception {
+    task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+
+    task.run();
+
+    verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsNoSuchLockException() throws Exception {
+    NoSuchLockException exception = new NoSuchLockException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+    task.run();
+
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsNoSuchTxnException() throws Exception {
+    NoSuchTxnException exception = new NoSuchTxnException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+    task.run();
+
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsTxnAbortedException() throws Exception {
+    TxnAbortedException exception = new TxnAbortedException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+    task.run();
+
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsTException() throws Exception {
+    TException exception = new TException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+
+    task.run();
+  }
+
+  private static List<Table> createTable() {
+    Table table = new Table();
+    table.setDbName("DB");
+    table.setTableName("TABLE");
+    return Arrays.asList(table);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
new file mode 100644
index 0000000..ef1e80c
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
@@ -0,0 +1,283 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import static org.apache.hadoop.hive.metastore.api.LockState.ABORT;
+import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED;
+import static org.apache.hadoop.hive.metastore.api.LockState.NOT_ACQUIRED;
+import static org.apache.hadoop.hive.metastore.api.LockState.WAITING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Timer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.common.collect.ImmutableList;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestLock {
+
+  private static final Table TABLE_1 = createTable("DB", "ONE");
+  private static final Table TABLE_2 = createTable("DB", "TWO");
+  private static final List<Table> TABLES = ImmutableList.of(TABLE_1, TABLE_2);
+  private static final long LOCK_ID = 42;
+  private static final long TRANSACTION_ID = 109;
+  private static final String USER = "ewest";
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private LockFailureListener mockListener;
+  @Mock
+  private LockResponse mockLockResponse;
+  @Mock
+  private HeartbeatFactory mockHeartbeatFactory;
+  @Mock
+  private Timer mockHeartbeat;
+  @Captor
+  private ArgumentCaptor<LockRequest> requestCaptor;
+
+  private Lock lock;
+  private HiveConf configuration = new HiveConf();
+
+  @Before
+  public void injectMocks() throws Exception {
+    when(mockMetaStoreClient.lock(any(LockRequest.class))).thenReturn(mockLockResponse);
+    when(mockLockResponse.getLockid()).thenReturn(LOCK_ID);
+    when(mockLockResponse.getState()).thenReturn(ACQUIRED);
+    when(
+        mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), any(LockFailureListener.class), any(Long.class),
+            any(Collection.class), anyLong(), anyInt())).thenReturn(mockHeartbeat);
+
+    lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, TABLES, 3, 0);
+  }
+
+  @Test
+  public void testAcquireReadLockWithNoIssues() throws Exception {
+    lock.acquire();
+    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+    assertNull(lock.getTransactionId());
+  }
+
+  @Test
+  public void testAcquireTxnLockWithNoIssues() throws Exception {
+    lock.acquire(TRANSACTION_ID);
+    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+    assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId());
+  }
+
+  @Test
+  public void testAcquireReadLockCheckHeartbeatCreated() throws Exception {
+    configuration.set("hive.txn.timeout", "100s");
+    lock.acquire();
+
+    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(TABLES),
+        eq(LOCK_ID), eq(75));
+  }
+
+  @Test
+  public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception {
+    configuration.set("hive.txn.timeout", "100s");
+    lock.acquire(TRANSACTION_ID);
+
+    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), eq(TABLES),
+        eq(LOCK_ID), eq(75));
+  }
+
+  @Test
+  public void testAcquireLockCheckUser() throws Exception {
+    lock.acquire();
+    verify(mockMetaStoreClient).lock(requestCaptor.capture());
+    LockRequest actualRequest = requestCaptor.getValue();
+    assertEquals(USER, actualRequest.getUser());
+  }
+
+  @Test
+  public void testAcquireReadLockCheckLocks() throws Exception {
+    lock.acquire();
+    verify(mockMetaStoreClient).lock(requestCaptor.capture());
+
+    LockRequest request = requestCaptor.getValue();
+    assertEquals(0, request.getTxnid());
+    assertEquals(USER, request.getUser());
+    assertEquals(InetAddress.getLocalHost().getHostName(), request.getHostname());
+
+    List<LockComponent> components = request.getComponent();
+
+    assertEquals(2, components.size());
+
+    LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+    expected1.setTablename("ONE");
+    assertTrue(components.contains(expected1));
+
+    LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+    expected2.setTablename("TWO");
+    assertTrue(components.contains(expected2));
+  }
+
+  @Test
+  public void testAcquireTxnLockCheckLocks() throws Exception {
+    lock.acquire(TRANSACTION_ID);
+    verify(mockMetaStoreClient).lock(requestCaptor.capture());
+
+    LockRequest request = requestCaptor.getValue();
+    assertEquals(TRANSACTION_ID, request.getTxnid());
+    assertEquals(USER, request.getUser());
+    assertEquals(InetAddress.getLocalHost().getHostName(), request.getHostname());
+
+    List<LockComponent> components = request.getComponent();
+
+    System.out.println(components);
+    assertEquals(2, components.size());
+
+    LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+    expected1.setTablename("ONE");
+    assertTrue(components.contains(expected1));
+
+    LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB");
+    expected2.setTablename("TWO");
+    assertTrue(components.contains(expected2));
+  }
+
+  @Test(expected = LockException.class)
+  public void testAcquireLockNotAcquired() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED);
+    lock.acquire();
+  }
+
+  @Test(expected = LockException.class)
+  public void testAcquireLockAborted() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(ABORT);
+    lock.acquire();
+  }
+
+  @Test(expected = LockException.class)
+  public void testAcquireLockWithWaitRetriesExceeded() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING);
+    lock.acquire();
+  }
+
+  @Test
+  public void testAcquireLockWithWaitRetries() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED);
+    lock.acquire();
+    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+  }
+
+  @Test
+  public void testReleaseLock() throws Exception {
+    lock.acquire();
+    lock.release();
+    verify(mockMetaStoreClient).unlock(LOCK_ID);
+  }
+
+  @Test
+  public void testReleaseLockNoLock() throws Exception {
+    lock.release();
+    verifyNoMoreInteractions(mockMetaStoreClient);
+  }
+
+  @Test
+  public void testReleaseLockCancelsHeartbeat() throws Exception {
+    lock.acquire();
+    lock.release();
+    verify(mockHeartbeat).cancel();
+  }
+
+  @Test
+  public void testReadHeartbeat() throws Exception {
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+    task.run();
+    verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+  }
+
+  @Test
+  public void testTxnHeartbeat() throws Exception {
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+  }
+
+  @Test
+  public void testReadHeartbeatFailsNoSuchLockException() throws Exception {
+    Throwable t = new NoSuchLockException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testTxnHeartbeatFailsNoSuchLockException() throws Exception {
+    Throwable t = new NoSuchLockException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testHeartbeatFailsNoSuchTxnException() throws Exception {
+    Throwable t = new NoSuchTxnException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testHeartbeatFailsTxnAbortedException() throws Exception {
+    Throwable t = new TxnAbortedException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testHeartbeatContinuesTException() throws Exception {
+    Throwable t = new TException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verifyZeroInteractions(mockListener);
+  }
+
+  private static Table createTable(String databaseName, String tableName) {
+    Table table = new Table();
+    table.setDbName(databaseName);
+    table.setTableName(tableName);
+    return table;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
new file mode 100644
index 0000000..f81373e
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -0,0 +1,38 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
+import org.junit.Test;
+
+public class TestBucketIdResolverImpl {
+
+  private static final int TOTAL_BUCKETS = 12;
+  private static final int RECORD_ID_COLUMN = 2;
+  // id - TODO: use a non-zero index to check for offset errors.
+  private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
+
+  private BucketIdResolver capturingBucketIdResolver = new BucketIdResolverImpl(
+      ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class,
+          ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, TOTAL_BUCKETS, BUCKET_COLUMN_INDEXES);
+
+  @Test
+  public void testAttachBucketIdToRecord() {
+    MutableRecord record = new MutableRecord(1, "hello");
+    capturingBucketIdResolver.attachBucketIdToRecord(record);
+    assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+    assertThat(record.id, is(1));
+    assertThat(record.msg.toString(), is("hello"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoBucketColumns() {
+    new BucketIdResolverImpl(ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class,
+        ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, TOTAL_BUCKETS, new int[0]);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
new file mode 100644
index 0000000..74fa59b
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
@@ -0,0 +1,70 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Test;
+
+public class TestGroupingValidator {
+
+  private GroupingValidator validator = new GroupingValidator();
+
+  @Test
+  public void uniqueGroups() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertTrue(validator.isInSequence(Arrays.asList("b", "B"), 2));
+  }
+
+  @Test
+  public void sameGroup() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+  }
+
+  @Test
+  public void revisitedGroup() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertFalse(validator.isInSequence(Arrays.asList("a", "A"), 1));
+  }
+
+  @Test
+  public void samePartitionDifferentBucket() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 2));
+  }
+
+  @Test
+  public void sameBucketDifferentPartition() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertTrue(validator.isInSequence(Arrays.asList("b", "B"), 1));
+  }
+
+  @Test
+  public void uniqueGroupsNoPartition() {
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 3));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 2));
+  }
+
+  @Test
+  public void sameGroupNoPartition() {
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+  }
+
+  @Test
+  public void revisitedGroupNoPartition() {
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 3));
+    assertFalse(validator.isInSequence(Collections.<String> emptyList(), 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
new file mode 100644
index 0000000..6e9ffa2
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
@@ -0,0 +1,234 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorCoordinator {
+
+  private static final List<String> UNPARTITIONED = Collections.<String> emptyList();
+  private static final List<String> PARTITION_B = Arrays.asList("B");
+  private static final List<String> PARTITION_A = Arrays.asList("A");
+  private static final long TRANSACTION_ID = 2L;
+  private static final int BUCKET_ID = 0;
+  private static final Path PATH_A = new Path("X");
+  private static final Path PATH_B = new Path("B");
+  private static final Object RECORD = "RECORD";
+  private static final RecordIdentifier ROW__ID_B0_R0 = new RecordIdentifier(10L, BUCKET_ID, 0L);
+  private static final RecordIdentifier ROW__ID_B0_R1 = new RecordIdentifier(10L, BUCKET_ID, 1L);
+  private static final RecordIdentifier ROW__ID_B1_R0 = new RecordIdentifier(10L, BUCKET_ID + 1, 0L);
+  private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L);
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private MutatorFactory mockMutatorFactory;
+  @Mock
+  private CreatePartitionHelper mockPartitionHelper;
+  @Mock
+  private GroupingValidator mockGroupingValidator;
+  @Mock
+  private SequenceValidator mockSequenceValidator;
+  @Mock
+  private AcidTable mockAcidTable;
+  @Mock
+  private RecordInspector mockRecordInspector;
+  @Mock
+  private BucketIdResolver mockBucketIdResolver;
+  @Mock
+  private Mutator mockMutator;
+
+  private MutatorCoordinator coordinator;
+
+  private HiveConf configuration = new HiveConf();
+
+  @Before
+  public void createCoordinator() throws Exception {
+    when(mockAcidTable.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName());
+    when(mockAcidTable.getTotalBuckets()).thenReturn(1);
+    when(mockAcidTable.getTransactionId()).thenReturn(TRANSACTION_ID);
+    when(mockAcidTable.createPartitions()).thenReturn(true);
+    when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector);
+    when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver);
+    when(mockMutatorFactory.newMutator(any(OrcOutputFormat.class), anyLong(), any(Path.class), anyInt())).thenReturn(
+        mockMutator);
+    when(mockPartitionHelper.getPathForPartition(any(List.class))).thenReturn(PATH_A);
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_INSERT);
+    when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(true);
+    when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(true);
+
+    coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, mockMutatorFactory, mockPartitionHelper,
+        mockGroupingValidator, mockSequenceValidator, mockAcidTable, false);
+  }
+
+  @Test
+  public void insert() throws Exception {
+    coordinator.insert(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator).insert(RECORD);
+  }
+
+  @Test
+  public void multipleInserts() throws Exception {
+    coordinator.insert(UNPARTITIONED, RECORD);
+    coordinator.insert(UNPARTITIONED, RECORD);
+    coordinator.insert(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator, times(3)).insert(RECORD);
+  }
+
+  @Test
+  public void insertPartitionChanges() throws Exception {
+    when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
+    when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+    coordinator.insert(PARTITION_A, RECORD);
+    coordinator.insert(PARTITION_B, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID));
+    verify(mockMutator, times(2)).insert(RECORD);
+  }
+
+  @Test
+  public void bucketChanges() throws Exception {
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_B1_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 1);
+
+    coordinator.update(UNPARTITIONED, RECORD);
+    coordinator.delete(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory)
+        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1));
+    verify(mockMutator).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+  }
+
+  @Test
+  public void partitionThenBucketChanges() throws Exception {
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_B0_R1, ROW__ID_B1_R0,
+        ROW__ID_INSERT);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 0, 1, 0);
+
+    when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
+    when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+    coordinator.update(PARTITION_A, RECORD);
+    coordinator.delete(PARTITION_B, RECORD);
+    coordinator.update(PARTITION_B, RECORD);
+    coordinator.insert(PARTITION_B, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
+        eq(BUCKET_ID));
+    verify(mockMutatorFactory)
+        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1));
+    verify(mockMutator, times(2)).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+    verify(mockMutator).insert(RECORD);
+    verify(mockSequenceValidator, times(4)).reset();
+  }
+
+  @Test(expected = RecordSequenceException.class)
+  public void outOfSequence() throws Exception {
+    when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false);
+
+    coordinator.update(UNPARTITIONED, RECORD);
+    coordinator.delete(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+  }
+  
+  @Test(expected = GroupRevisitedException.class)
+  public void revisitGroup() throws Exception {
+    when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(false);
+    
+    coordinator.update(UNPARTITIONED, RECORD);
+    coordinator.delete(UNPARTITIONED, RECORD);
+    
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+  }
+
+  @Test(expected = BucketIdException.class)
+  public void insertWithBadBucket() throws Exception {
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1);
+
+    coordinator.insert(UNPARTITIONED, RECORD);
+  }
+
+  @Test(expected = BucketIdException.class)
+  public void updateWithBadBucket() throws Exception {
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1);
+
+    coordinator.update(UNPARTITIONED, RECORD);
+  }
+
+  @Test
+  public void deleteWithBadBucket() throws Exception {
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1);
+
+    coordinator.delete(UNPARTITIONED, RECORD);
+  }
+
+  @Test
+  public void closeNoRecords() throws Exception {
+    coordinator.close();
+
+    // No mutator created
+    verifyZeroInteractions(mockMutator);
+  }
+
+  @Test
+  public void closeUsedCoordinator() throws Exception {
+    coordinator.insert(UNPARTITIONED, RECORD);
+    coordinator.close();
+
+    verify(mockMutator).close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
new file mode 100644
index 0000000..b29c763
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
@@ -0,0 +1,99 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorImpl {
+
+  private static final Object RECORD = new Object();
+  private static final int RECORD_ID_COLUMN = 2;
+  private static final int BUCKET_ID = 0;
+  private static final Path PATH = new Path("X");
+  private static final long TRANSACTION_ID = 1L;
+
+  @Mock
+  private AcidOutputFormat<?, ?> mockOutputFormat;
+  @Mock
+  private ObjectInspector mockObjectInspector;
+  @Mock
+  private RecordUpdater mockRecordUpdater;
+  @Captor
+  private ArgumentCaptor<AcidOutputFormat.Options> captureOptions;
+
+  private final HiveConf configuration = new HiveConf();
+
+  private Mutator mutator;
+
+  @Before
+  public void injectMocks() throws IOException {
+    when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
+    mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, TRANSACTION_ID,
+        PATH, BUCKET_ID);
+  }
+
+  @Test
+  public void testCreatesRecordReader() throws IOException {
+    verify(mockOutputFormat).getRecordUpdater(eq(PATH), captureOptions.capture());
+    Options options = captureOptions.getValue();
+    assertThat(options.getBucket(), is(BUCKET_ID));
+    assertThat(options.getConfiguration(), is((Configuration) configuration));
+    assertThat(options.getInspector(), is(mockObjectInspector));
+    assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN));
+    assertThat(options.getMinimumTransactionId(), is(TRANSACTION_ID));
+    assertThat(options.getMaximumTransactionId(), is(TRANSACTION_ID));
+  }
+
+  @Test
+  public void testInsertDelegates() throws IOException {
+    mutator.insert(RECORD);
+    verify(mockRecordUpdater).insert(TRANSACTION_ID, RECORD);
+  }
+
+  @Test
+  public void testUpdateDelegates() throws IOException {
+    mutator.update(RECORD);
+    verify(mockRecordUpdater).update(TRANSACTION_ID, RECORD);
+  }
+
+  @Test
+  public void testDeleteDelegates() throws IOException {
+    mutator.delete(RECORD);
+    verify(mockRecordUpdater).delete(TRANSACTION_ID, RECORD);
+  }
+
+  @Test
+  public void testCloseDelegates() throws IOException {
+    mutator.close();
+    verify(mockRecordUpdater).close(false);
+  }
+
+  @Test
+  public void testFlushDoesNothing() throws IOException {
+    mutator.flush();
+    verify(mockRecordUpdater, never()).flush();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
new file mode 100644
index 0000000..389ad33
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
@@ -0,0 +1,31 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
+import org.junit.Test;
+
+public class TestRecordInspectorImpl {
+
+  private static final int ROW_ID_COLUMN = 2;
+
+  private RecordInspectorImpl inspector = new RecordInspectorImpl(ObjectInspectorFactory.getReflectionObjectInspector(
+      MutableRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA), ROW_ID_COLUMN);
+
+  @Test
+  public void testExtractRecordIdentifier() {
+    RecordIdentifier recordIdentifier = new RecordIdentifier(10L, 4, 20L);
+    MutableRecord record = new MutableRecord(1, "hello", recordIdentifier);
+    assertThat(inspector.extractRecordIdentifier(record), is(recordIdentifier));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNotAStructObjectInspector() {
+    new RecordInspectorImpl(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, 2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
new file mode 100644
index 0000000..33f9606
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
@@ -0,0 +1,91 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.junit.Test;
+
+public class TestSequenceValidator {
+
+  private static final int BUCKET_ID = 1;
+
+  private SequenceValidator validator = new SequenceValidator();
+
+  @Test
+  public void testSingleInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+  }
+
+  @Test
+  public void testRowIdInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true));
+  }
+
+  @Test
+  public void testTxIdInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true));
+  }
+
+  @Test
+  public void testMixedInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 1)), is(true));
+  }
+
+  @Test
+  public void testNegativeTxId() {
+    assertThat(validator.isInSequence(new RecordIdentifier(-1L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+  }
+
+  @Test
+  public void testNegativeRowId() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, -1)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+  }
+
+  @Test
+  public void testRowIdOutOfSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(false));
+  }
+
+  @Test
+  public void testReset() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true));
+    // New partition for example
+    validator.reset();
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true));
+  }
+
+  @Test
+  public void testTxIdOutOfSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false));
+  }
+
+  @Test
+  public void testMixedOutOfSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 4)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 5)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 6)), is(false));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullRecordIdentifier() {
+    validator.isInSequence(null);
+  }
+
+}


[3/3] hive git commit: HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates)

Posted by ga...@apache.org.
HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/994d98c0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/994d98c0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/994d98c0

Branch: refs/heads/master
Commit: 994d98c0963ee48c2abbfee6f389d75c0223c8f1
Parents: 3991dba
Author: Alan Gates <ga...@hortonworks.com>
Authored: Tue Jun 30 14:59:55 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Tue Jun 30 14:59:55 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 hcatalog/streaming/pom.xml                      |   6 +
 .../streaming/mutate/HiveConfFactory.java       |  63 +++
 .../mutate/UgiMetaStoreClientFactory.java       | 102 ++++
 .../streaming/mutate/client/AcidTable.java      | 112 ++++
 .../mutate/client/AcidTableSerializer.java      | 100 ++++
 .../mutate/client/ClientException.java          |  15 +
 .../mutate/client/ConnectionException.java      |  15 +
 .../streaming/mutate/client/MutatorClient.java  | 140 +++++
 .../mutate/client/MutatorClientBuilder.java     | 115 ++++
 .../streaming/mutate/client/TableType.java      |  37 ++
 .../streaming/mutate/client/Transaction.java    | 114 ++++
 .../mutate/client/TransactionException.java     |  15 +
 .../mutate/client/lock/HeartbeatFactory.java    |  30 +
 .../mutate/client/lock/HeartbeatTimerTask.java  |  66 +++
 .../streaming/mutate/client/lock/Lock.java      | 282 ++++++++++
 .../mutate/client/lock/LockException.java       |  15 +
 .../mutate/client/lock/LockFailureListener.java |  26 +
 .../mutate/doc-files/system-overview.dot        |  27 +
 .../hive/hcatalog/streaming/mutate/package.html | 495 +++++++++++++++++
 .../mutate/worker/BucketIdException.java        |  11 +
 .../mutate/worker/BucketIdResolver.java         |  11 +
 .../mutate/worker/BucketIdResolverImpl.java     |  76 +++
 .../mutate/worker/CreatePartitionHelper.java    |  83 +++
 .../mutate/worker/GroupRevisitedException.java  |  11 +
 .../mutate/worker/GroupingValidator.java        |  74 +++
 .../streaming/mutate/worker/Mutator.java        |  21 +
 .../mutate/worker/MutatorCoordinator.java       | 281 ++++++++++
 .../worker/MutatorCoordinatorBuilder.java       |  76 +++
 .../streaming/mutate/worker/MutatorFactory.java |  16 +
 .../streaming/mutate/worker/MutatorImpl.java    |  84 +++
 .../streaming/mutate/worker/OperationType.java  |   7 +
 .../worker/PartitionCreationException.java      |  15 +
 .../mutate/worker/RecordInspector.java          |  11 +
 .../mutate/worker/RecordInspectorImpl.java      |  45 ++
 .../mutate/worker/RecordSequenceException.java  |  11 +
 .../mutate/worker/SequenceValidator.java        |  49 ++
 .../mutate/worker/WorkerException.java          |  15 +
 .../streaming/mutate/ExampleUseCase.java        |  82 +++
 .../streaming/mutate/MutableRecord.java         |  50 ++
 .../mutate/ReflectiveMutatorFactory.java        |  51 ++
 .../streaming/mutate/StreamingAssert.java       | 191 +++++++
 .../streaming/mutate/StreamingTestUtils.java    | 261 +++++++++
 .../streaming/mutate/TestMutations.java         | 544 +++++++++++++++++++
 .../mutate/client/TestAcidTableSerializer.java  |  66 +++
 .../mutate/client/TestMutatorClient.java        | 176 ++++++
 .../mutate/client/TestTransaction.java          |  95 ++++
 .../client/lock/TestHeartbeatTimerTask.java     | 100 ++++
 .../streaming/mutate/client/lock/TestLock.java  | 283 ++++++++++
 .../mutate/worker/TestBucketIdResolverImpl.java |  38 ++
 .../mutate/worker/TestGroupingValidator.java    |  70 +++
 .../mutate/worker/TestMutatorCoordinator.java   | 234 ++++++++
 .../mutate/worker/TestMutatorImpl.java          |  99 ++++
 .../mutate/worker/TestRecordInspectorImpl.java  |  31 ++
 .../mutate/worker/TestSequenceValidator.java    |  91 ++++
 55 files changed, 5135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c5decaf..4d341a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ hcatalog/webhcat/java-client/target
 hcatalog/storage-handlers/hbase/target
 hcatalog/webhcat/svr/target
 conf/hive-default.xml.template
+.DS_Store

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index 2135e89..6d03ce1 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -89,6 +89,12 @@
       <optional>true</optional>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <optional>true</optional>
+      <version>3.3.2</version>
+    </dependency>
 
     <!-- test -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
new file mode 100644
index 0000000..fcf446c
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
@@ -0,0 +1,63 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Creates/configures {@link HiveConf} instances with required ACID attributes. */
+public class HiveConfFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HiveConfFactory.class);
+  private static final String TRANSACTION_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+  public static HiveConf newInstance(Configuration configuration, Class<?> clazz, String metaStoreUri) {
+    HiveConf hiveConf = null;
+    if (configuration != null) {
+      if (!HiveConf.class.isAssignableFrom(configuration.getClass())) {
+        hiveConf = new HiveConf(configuration, clazz);
+      } else {
+        hiveConf = (HiveConf) configuration;
+      }
+    }
+
+    if (hiveConf == null) {
+      hiveConf = HiveConfFactory.newInstance(clazz, metaStoreUri);
+    } else {
+      HiveConfFactory.overrideSettings(hiveConf);
+    }
+    return hiveConf;
+  }
+
+  public static HiveConf newInstance(Class<?> clazz, String metaStoreUri) {
+    HiveConf conf = new HiveConf(clazz);
+    if (metaStoreUri != null) {
+      setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    overrideSettings(conf);
+    return conf;
+  }
+
+  public static void overrideSettings(HiveConf conf) {
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, TRANSACTION_MANAGER);
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    // Avoids creating Tez Client sessions internally as it takes much longer currently
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : {} = {}", var, value);
+    }
+    conf.setVar(var, value);
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : {} = {}", var, value);
+    }
+    conf.setBoolVar(var, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
new file mode 100644
index 0000000..2a4ddbe
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
@@ -0,0 +1,102 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import com.google.common.reflect.AbstractInvocationHandler;
+
+/**
+ * Creates a proxied {@link IMetaStoreClient client} that wraps calls in a {@link PrivilegedExceptionAction} if the
+ * {@link UserGroupInformation} is specified. Invokes directly otherwise.
+ */
+public class UgiMetaStoreClientFactory {
+
+  private static Set<Method> I_META_STORE_CLIENT_METHODS = getIMetaStoreClientMethods();
+
+  private final String metaStoreUri;
+  private final HiveConf conf;
+  private final boolean secureMode;
+  private final UserGroupInformation authenticatedUser;
+  private final String user;
+
+  public UgiMetaStoreClientFactory(String metaStoreUri, HiveConf conf, UserGroupInformation authenticatedUser,
+      String user, boolean secureMode) {
+    this.metaStoreUri = metaStoreUri;
+    this.conf = conf;
+    this.authenticatedUser = authenticatedUser;
+    this.user = user;
+    this.secureMode = secureMode;
+    if (metaStoreUri != null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    if (secureMode) {
+      conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+    }
+  }
+
+  public IMetaStoreClient newInstance() throws MetaException {
+    return newInstance(new HiveMetaStoreClient(conf));
+  }
+
+  public IMetaStoreClient newInstance(IMetaStoreClient delegate) throws MetaException {
+    return createProxy(delegate, user, authenticatedUser);
+  }
+
+  @Override
+  public String toString() {
+    return "UgiMetaStoreClientFactory [metaStoreUri=" + metaStoreUri + ", secureMode=" + secureMode
+        + ", authenticatedUser=" + authenticatedUser + ", user=" + user + "]";
+  }
+
+  private IMetaStoreClient createProxy(final IMetaStoreClient delegate, final String user,
+      final UserGroupInformation authenticatedUser) {
+    InvocationHandler handler = new AbstractInvocationHandler() {
+
+      @Override
+      protected Object handleInvocation(Object proxy, final Method method, final Object[] args) throws Throwable {
+        try {
+          if (!I_META_STORE_CLIENT_METHODS.contains(method) || authenticatedUser == null) {
+            return method.invoke(delegate, args);
+          }
+          try {
+            return authenticatedUser.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                return method.invoke(delegate, args);
+              }
+            });
+          } catch (IOException | InterruptedException e) {
+            throw new TException("PrivilegedExceptionAction failed as user '" + user + "'.", e);
+          }
+        } catch (UndeclaredThrowableException | InvocationTargetException e) {
+          throw e.getCause();
+        }
+      }
+    };
+
+    ClassLoader classLoader = IMetaStoreClient.class.getClassLoader();
+    Class<?>[] interfaces = new Class<?>[] { IMetaStoreClient.class };
+    Object proxy = Proxy.newProxyInstance(classLoader, interfaces, handler);
+    return IMetaStoreClient.class.cast(proxy);
+  }
+
+  private static Set<Method> getIMetaStoreClientMethods() {
+    return new HashSet<>(Arrays.asList(IMetaStoreClient.class.getDeclaredMethods()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
new file mode 100644
index 0000000..20747db
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
@@ -0,0 +1,112 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/**
+ * Describes an ACID table that can receive mutation events. Used to encode the information required by workers to write
+ * ACID events without requiring them to once more retrieve the data from the meta store db.
+ */
+public class AcidTable implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final String databaseName;
+  private final String tableName;
+  private final boolean createPartitions;
+  private final TableType tableType;
+  private long transactionId;
+
+  private Table table;
+
+  AcidTable(String databaseName, String tableName, boolean createPartitions, TableType tableType) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.createPartitions = createPartitions;
+    this.tableType = tableType;
+  }
+
+  /**
+   * Returns {@code 0} until such a time that a {@link Transaction} has been acquired (when
+   * {@link MutatorClient#newTransaction()} exits), at which point this will return the
+   * {@link Transaction#getTransactionId() transaction id}.
+   */
+  public long getTransactionId() {
+    return transactionId;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public boolean createPartitions() {
+    return createPartitions;
+  }
+
+  /**
+   * Returns {@code null} until such a time that the table described by the {@link #getDatabaseName() database_name}
+   * {@code .}{@link #getTableName() table_name} has been resolved with the meta store database (when
+   * {@link MutatorClient#connect()} exits), at which point this will then return the corresponding
+   * {@link StorageDescriptor#getOutputFormat() OutputFormat}.
+   */
+  public String getOutputFormatName() {
+    return table != null ? table.getSd().getOutputFormat() : null;
+  }
+
+  /**
+   * Returns {@code 0} until such a time that the table described by the {@link #getDatabaseName() database_name}
+   * {@code .}{@link #getTableName() table_name} has been resolved with the meta store database (when
+   * {@link MutatorClient#connect()} exits), at which point this will then return the corresponding
+   * {@link StorageDescriptor#getNumBuckets() total bucket count}.
+   */
+  public int getTotalBuckets() {
+    return table != null ? table.getSd().getNumBuckets() : 0;
+  }
+
+  public TableType getTableType() {
+    return tableType;
+  }
+
+  public String getQualifiedName() {
+    return (databaseName + "." + tableName).toUpperCase();
+  }
+
+  /**
+   * Returns {@code null} until such a time that the table described by the {@link #getDatabaseName() database_name}
+   * {@code .}{@link #getTableName() table_name} has been resolved with the meta store database (when
+   * {@link MutatorClient#connect()} exits), at which point this will then return the corresponding {@link Table}.
+   * Provided as a convenience to API users who may wish to gather further meta data regarding the table without
+   * connecting with the meta store once more.
+   */
+  public Table getTable() {
+    return table;
+  }
+
+  void setTransactionId(long transactionId) {
+    this.transactionId = transactionId;
+  }
+
+  void setTable(Table table) {
+    if (!databaseName.equalsIgnoreCase(table.getDbName())) {
+      throw new IllegalArgumentException("Incorrect database name.");
+    }
+    if (!tableName.equalsIgnoreCase(table.getTableName())) {
+      throw new IllegalArgumentException("Incorrect table name.");
+    }
+    this.table = table;
+  }
+
+  @Override
+  public String toString() {
+    return "AcidTable [databaseName=" + databaseName + ", tableName=" + tableName + ", createPartitions="
+        + createPartitions + ", tableType=" + tableType + ", outputFormatName=" + getOutputFormatName()
+        + ", totalBuckets=" + getTotalBuckets() + ", transactionId=" + transactionId + "]";
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
new file mode 100644
index 0000000..5d8a2bf
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
@@ -0,0 +1,100 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility to serialize/deserialize {@link AcidTable AcidTables} into strings so that they can be easily transported as
+ * {@link Configuration} properties.
+ */
+public class AcidTableSerializer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AcidTableSerializer.class);
+
+  /* Allow for improved schemes. */
+  private static final String PROLOG_V1 = "AcidTableV1:";
+
+  /** Returns a base 64 encoded representation of the supplied {@link AcidTable}. */
+  public static String encode(AcidTable table) throws IOException {
+    DataOutputStream data = null;
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try {
+      data = new DataOutputStream(bytes);
+      data.writeUTF(table.getDatabaseName());
+      data.writeUTF(table.getTableName());
+      data.writeBoolean(table.createPartitions());
+      if (table.getTransactionId() <= 0) {
+        LOG.warn("Transaction ID <= 0. The recipient is probably expecting a transaction ID.");
+      }
+      data.writeLong(table.getTransactionId());
+      data.writeByte(table.getTableType().getId());
+
+      Table metaTable = table.getTable();
+      if (metaTable != null) {
+        byte[] thrift = new TSerializer(new TCompactProtocol.Factory()).serialize(metaTable);
+        data.writeInt(thrift.length);
+        data.write(thrift);
+      } else {
+        LOG.warn("Meta store table is null. The recipient is probably expecting an instance.");
+        data.writeInt(0);
+      }
+    } catch (TException e) {
+      throw new IOException("Error serializing meta store table.", e);
+    } finally {
+      data.close();
+    }
+
+    return PROLOG_V1 + new String(Base64.encodeBase64(bytes.toByteArray()), Charset.forName("UTF-8"));
+  }
+
+  /** Returns the {@link AcidTable} instance decoded from a base 64 representation. */
+  public static AcidTable decode(String encoded) throws IOException {
+    if (!encoded.startsWith(PROLOG_V1)) {
+      throw new IllegalStateException("Unsupported version.");
+    }
+    encoded = encoded.substring(PROLOG_V1.length());
+
+    byte[] decoded = Base64.decodeBase64(encoded);
+    AcidTable table = null;
+    try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(decoded))) {
+      String databaseName = in.readUTF();
+      String tableName = in.readUTF();
+      boolean createPartitions = in.readBoolean();
+      long transactionId = in.readLong();
+      TableType tableType = TableType.valueOf(in.readByte());
+      int thriftLength = in.readInt();
+
+      table = new AcidTable(databaseName, tableName, createPartitions, tableType);
+      table.setTransactionId(transactionId);
+
+      Table metaTable = null;
+      if (thriftLength > 0) {
+        metaTable = new Table();
+        try {
+          byte[] thriftEncoded = new byte[thriftLength];
+          in.readFully(thriftEncoded, 0, thriftLength);
+          new TDeserializer(new TCompactProtocol.Factory()).deserialize(metaTable, thriftEncoded);
+          table.setTable(metaTable);
+        } catch (TException e) {
+          throw new IOException("Error deserializing meta store table.", e);
+        }
+      }
+    }
+    return table;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
new file mode 100644
index 0000000..988dc38
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public class ClientException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  ClientException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  ClientException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
new file mode 100644
index 0000000..b54455a
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public class ConnectionException extends ClientException {
+
+  private static final long serialVersionUID = 1L;
+
+  ConnectionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  ConnectionException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
new file mode 100644
index 0000000..2724525
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
@@ -0,0 +1,140 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for orchestrating {@link Transaction Transactions} within which ACID table mutation events can occur.
+ * Typically this will be a large batch of delta operations.
+ */
+public class MutatorClient implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MutatorClient.class);
+  private static final String TRANSACTIONAL_PARAM_KEY = "transactional";
+
+  private final IMetaStoreClient metaStoreClient;
+  private final Lock.Options lockOptions;
+  private final List<AcidTable> tables;
+  private boolean connected;
+
+  MutatorClient(IMetaStoreClient metaStoreClient, HiveConf configuration, LockFailureListener lockFailureListener,
+      String user, Collection<AcidTable> tables) {
+    this.metaStoreClient = metaStoreClient;
+    this.tables = Collections.unmodifiableList(new ArrayList<>(tables));
+
+    lockOptions = new Lock.Options()
+        .configuration(configuration)
+        .lockFailureListener(lockFailureListener == null ? LockFailureListener.NULL_LISTENER : lockFailureListener)
+        .user(user);
+    for (AcidTable table : tables) {
+      lockOptions.addTable(table.getDatabaseName(), table.getTableName());
+    }
+  }
+
+  /**
+   * Connects to the {@link IMetaStoreClient meta store} that will be used to manage {@link Transaction} life-cycles.
+   * Also checks that the tables destined to receive mutation events are able to do so. The client should only hold one
+   * open transaction at any given time (TODO: enforce this).
+   */
+  public void connect() throws ConnectionException {
+    if (connected) {
+      throw new ConnectionException("Already connected.");
+    }
+    for (AcidTable table : tables) {
+      checkTable(metaStoreClient, table);
+    }
+    LOG.debug("Connected to end point {}", metaStoreClient);
+    connected = true;
+  }
+
+  /** Creates a new {@link Transaction} by opening a transaction with the {@link IMetaStoreClient meta store}. */
+  public Transaction newTransaction() throws TransactionException {
+    if (!connected) {
+      throw new TransactionException("Not connected - cannot create transaction.");
+    }
+    Transaction transaction = new Transaction(metaStoreClient, lockOptions);
+    for (AcidTable table : tables) {
+      table.setTransactionId(transaction.getTransactionId());
+    }
+    LOG.debug("Created transaction {}", transaction);
+    return transaction;
+  }
+
+  /** Did the client connect successfully. Note the the client may have since become disconnected. */
+  public boolean isConnected() {
+    return connected;
+  }
+
+  /**
+   * Closes the client releasing any {@link IMetaStoreClient meta store} connections held. Does not notify any open
+   * transactions (TODO: perhaps it should?)
+   */
+  @Override
+  public void close() throws IOException {
+    metaStoreClient.close();
+    LOG.debug("Closed client.");
+    connected = false;
+  }
+
+  /**
+   * Returns the list of managed {@link AcidTable AcidTables} that can receive mutation events under the control of this
+   * client.
+   */
+  public List<AcidTable> getTables() throws ConnectionException {
+    if (!connected) {
+      throw new ConnectionException("Not connected - cannot interrogate tables.");
+    }
+    return Collections.<AcidTable> unmodifiableList(tables);
+  }
+
+  @Override
+  public String toString() {
+    return "MutatorClient [metaStoreClient=" + metaStoreClient + ", connected=" + connected + "]";
+  }
+
+  private void checkTable(IMetaStoreClient metaStoreClient, AcidTable acidTable) throws ConnectionException {
+    try {
+      LOG.debug("Checking table {}.", acidTable.getQualifiedName());
+      Table metaStoreTable = metaStoreClient.getTable(acidTable.getDatabaseName(), acidTable.getTableName());
+
+      if (acidTable.getTableType() == TableType.SINK) {
+        Map<String, String> parameters = metaStoreTable.getParameters();
+        if (!Boolean.parseBoolean(parameters.get(TRANSACTIONAL_PARAM_KEY))) {
+          throw new ConnectionException("Cannot stream to table that is not transactional: '"
+              + acidTable.getQualifiedName() + "'.");
+        }
+        int totalBuckets = metaStoreTable.getSd().getNumBuckets();
+        LOG.debug("Table {} has {} buckets.", acidTable.getQualifiedName(), totalBuckets);
+        if (totalBuckets <= 0) {
+          throw new ConnectionException("Cannot stream to table that has not been bucketed: '"
+              + acidTable.getQualifiedName() + "'.");
+        }
+
+        String outputFormat = metaStoreTable.getSd().getOutputFormat();
+        LOG.debug("Table {} has {} OutputFormat.", acidTable.getQualifiedName(), outputFormat);
+        acidTable.setTable(metaStoreTable);
+      }
+    } catch (NoSuchObjectException e) {
+      throw new ConnectionException("Invalid table '" + acidTable.getQualifiedName() + "'", e);
+    } catch (TException e) {
+      throw new ConnectionException("Error communicating with the meta store", e);
+    }
+    LOG.debug("Table {} OK.", acidTable.getQualifiedName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
new file mode 100644
index 0000000..6c21c59
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
@@ -0,0 +1,115 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory;
+import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+
+/** Convenience class for building {@link MutatorClient} instances. */
+public class MutatorClientBuilder {
+
+  private final Map<String, AcidTable> tables = new HashMap<>();
+  private HiveConf configuration;
+  private UserGroupInformation authenticatedUser;
+  private String metaStoreUri;
+  public LockFailureListener lockFailureListener;
+
+  public MutatorClientBuilder configuration(HiveConf conf) {
+    this.configuration = conf;
+    return this;
+  }
+
+  public MutatorClientBuilder authenticatedUser(UserGroupInformation authenticatedUser) {
+    this.authenticatedUser = authenticatedUser;
+    return this;
+  }
+
+  public MutatorClientBuilder metaStoreUri(String metaStoreUri) {
+    this.metaStoreUri = metaStoreUri;
+    return this;
+  }
+
+  /** Set a listener to handle {@link Lock} failure events - highly recommended. */
+  public MutatorClientBuilder lockFailureListener(LockFailureListener lockFailureListener) {
+    this.lockFailureListener = lockFailureListener;
+    return this;
+  }
+
+  /**
+   * Adds a mutation event destination (an ACID table) to be managed by this client, which is either unpartitioned or
+   * will is not to have partitions created automatically.
+   */
+  public MutatorClientBuilder addSourceTable(String databaseName, String tableName) {
+    addTable(databaseName, tableName, false, TableType.SOURCE);
+    return this;
+  }
+
+  /**
+   * Adds a mutation event destination (an ACID table) to be managed by this client, which is either unpartitioned or
+   * will is not to have partitions created automatically.
+   */
+  public MutatorClientBuilder addSinkTable(String databaseName, String tableName) {
+    return addSinkTable(databaseName, tableName, false);
+  }
+
+  /**
+   * Adds a partitioned mutation event destination (an ACID table) to be managed by this client, where new partitions
+   * will be created as needed.
+   */
+  public MutatorClientBuilder addSinkTable(String databaseName, String tableName, boolean createPartitions) {
+    addTable(databaseName, tableName, createPartitions, TableType.SINK);
+    return this;
+  }
+
+  private void addTable(String databaseName, String tableName, boolean createPartitions, TableType tableType) {
+    if (databaseName == null) {
+      throw new IllegalArgumentException("Database cannot be null");
+    }
+    if (tableName == null) {
+      throw new IllegalArgumentException("Table cannot be null");
+    }
+    String key = (databaseName + "." + tableName).toUpperCase();
+    AcidTable previous = tables.get(key);
+    if (previous != null) {
+      if (tableType == TableType.SINK && previous.getTableType() != TableType.SINK) {
+        tables.remove(key);
+      } else {
+        throw new IllegalArgumentException("Table has already been added: " + databaseName + "." + tableName);
+      }
+    }
+
+    Table table = new Table();
+    table.setDbName(databaseName);
+    table.setTableName(tableName);
+    tables.put(key, new AcidTable(databaseName, tableName, createPartitions, tableType));
+  }
+
+  /** Builds the client. */
+  public MutatorClient build() throws ClientException, MetaException {
+    String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
+    boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
+
+    configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri);
+
+    IMetaStoreClient metaStoreClient;
+    try {
+      metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode)
+          .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+    } catch (IOException e) {
+      throw new ClientException("Could not create meta store client.", e);
+    }
+
+    return new MutatorClient(metaStoreClient, configuration, lockFailureListener, user, tables.values());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
new file mode 100644
index 0000000..aa6d239
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
@@ -0,0 +1,37 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public enum TableType {
+  SOURCE((byte) 0),
+  SINK((byte) 1);
+
+  private static final TableType[] INDEX = buildIndex();
+
+  private static TableType[] buildIndex() {
+    TableType[] index = new TableType[TableType.values().length];
+    for (TableType type : values()) {
+      byte position = type.getId();
+      if (index[position] != null) {
+        throw new IllegalStateException("Overloaded index: " + position);
+      }
+      index[position] = type;
+    }
+    return index;
+  }
+
+  private byte id;
+
+  private TableType(byte id) {
+    this.id = id;
+  }
+
+  public byte getId() {
+    return id;
+  }
+
+  public static TableType valueOf(byte id) {
+    if (id < 0 || id >= INDEX.length) {
+      throw new IllegalArgumentException("Invalid id: " + id);
+    }
+    return INDEX[id];
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
new file mode 100644
index 0000000..6532900
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
@@ -0,0 +1,114 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Transaction {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);
+
+  private final Lock lock;
+  private final IMetaStoreClient metaStoreClient;
+  private final long transactionId;
+
+  private TxnState state;
+
+  Transaction(IMetaStoreClient metaStoreClient, Lock.Options lockOptions) throws TransactionException {
+    this(metaStoreClient, new Lock(metaStoreClient, lockOptions));
+  }
+
+  /** Visible for testing only. */
+  Transaction(IMetaStoreClient metaStoreClient, Lock lock) throws TransactionException {
+    this.metaStoreClient = metaStoreClient;
+    this.lock = lock;
+    transactionId = open(lock.getUser());
+  }
+
+  public long getTransactionId() {
+    return transactionId;
+  }
+
+  public TxnState getState() {
+    return state;
+  }
+
+  /**
+   * Begin the transaction. Acquires a {@link Lock} for the transaction and {@link AcidTable AcidTables}.
+   */
+  public void begin() throws TransactionException {
+    try {
+      lock.acquire(transactionId);
+    } catch (LockException e) {
+      throw new TransactionException("Unable to acquire lock for transaction: " + transactionId, e);
+    }
+    state = TxnState.OPEN;
+    LOG.debug("Begin. Transaction id: {}", transactionId);
+  }
+
+  /** Commits the transaction. Releases the {@link Lock}. */
+  public void commit() throws TransactionException {
+    try {
+      lock.release();
+    } catch (LockException e) {
+      // This appears to leave the remove transaction in an inconsistent state but the heartbeat is now
+      // cancelled and it will eventually time out
+      throw new TransactionException("Unable to release lock: " + lock + " for transaction: " + transactionId, e);
+    }
+    try {
+      metaStoreClient.commitTxn(transactionId);
+      state = TxnState.COMMITTED;
+    } catch (NoSuchTxnException e) {
+      throw new TransactionException("Invalid transaction id: " + transactionId, e);
+    } catch (TxnAbortedException e) {
+      throw new TransactionException("Aborted transaction cannot be committed: " + transactionId, e);
+    } catch (TException e) {
+      throw new TransactionException("Unable to commit transaction: " + transactionId, e);
+    }
+    LOG.debug("Committed. Transaction id: {}", transactionId);
+  }
+
+  /** Aborts the transaction. Releases the {@link Lock}. */
+  public void abort() throws TransactionException {
+    try {
+      lock.release();
+    } catch (LockException e) {
+      // This appears to leave the remove transaction in an inconsistent state but the heartbeat is now
+      // cancelled and it will eventually time out
+      throw new TransactionException("Unable to release lock: " + lock + " for transaction: " + transactionId, e);
+    }
+    try {
+      metaStoreClient.rollbackTxn(transactionId);
+      state = TxnState.ABORTED;
+    } catch (NoSuchTxnException e) {
+      throw new TransactionException("Unable to abort invalid transaction id : " + transactionId, e);
+    } catch (TException e) {
+      throw new TransactionException("Unable to abort transaction id : " + transactionId, e);
+    }
+    LOG.debug("Aborted. Transaction id: {}", transactionId);
+  }
+
+  @Override
+  public String toString() {
+    return "Transaction [transactionId=" + transactionId + ", state=" + state + "]";
+  }
+
+  private long open(String user) throws TransactionException {
+    long transactionId = -1;
+    try {
+      transactionId = metaStoreClient.openTxn(user);
+      state = TxnState.INACTIVE;
+    } catch (TException e) {
+      throw new TransactionException("Unable to open transaction for user: " + user, e);
+    }
+    LOG.debug("Opened transaction with id: {}", transactionId);
+    return transactionId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
new file mode 100644
index 0000000..48fb1cf
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public class TransactionException extends ClientException {
+
+  private static final long serialVersionUID = 1L;
+
+  TransactionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  TransactionException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
new file mode 100644
index 0000000..5814d37
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
@@ -0,0 +1,30 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import java.util.Collection;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Creates a default {@link HeartbeatTimerTask} for {@link Lock Locks}. */
+class HeartbeatFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HeartbeatFactory.class);
+
+  /** Creates a new {@link HeartbeatTimerTask} instance for the {@link Lock} and schedules it. */
+  Timer newInstance(IMetaStoreClient metaStoreClient, LockFailureListener listener, Long transactionId,
+      Collection<Table> tableDescriptors, long lockId, int heartbeatPeriod) {
+    Timer heartbeatTimer = new Timer("hive-lock-heartbeat[lockId=" + lockId + ", transactionId=" + transactionId + "]",
+        true);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(metaStoreClient, listener, transactionId, tableDescriptors, lockId);
+    heartbeatTimer.schedule(task, TimeUnit.SECONDS.toMillis(heartbeatPeriod),
+        TimeUnit.SECONDS.toMillis(heartbeatPeriod));
+
+    LOG.debug("Scheduled heartbeat timer task: {}", heartbeatTimer);
+    return heartbeatTimer;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
new file mode 100644
index 0000000..2446c10
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
@@ -0,0 +1,66 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import java.util.Collection;
+import java.util.TimerTask;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TimerTask} that sends {@link IMetaStoreClient#heartbeat(long, long) heartbeat} events to the
+ * {@link IMetaStoreClient meta store} to keet the {@link Lock} and {@link Transaction} alive. Nofifies the registered
+ * {@link LockFailureListener} should the lock fail.
+ */
+class HeartbeatTimerTask extends TimerTask {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HeartbeatTimerTask.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final long lockId;
+  private final Long transactionId;
+  private final LockFailureListener listener;
+  private final Collection<Table> tableDescriptors;
+
+  HeartbeatTimerTask(IMetaStoreClient metaStoreClient, LockFailureListener listener, Long transactionId,
+      Collection<Table> tableDescriptors, long lockId) {
+    this.metaStoreClient = metaStoreClient;
+    this.listener = listener;
+    this.transactionId = transactionId;
+    this.tableDescriptors = tableDescriptors;
+    this.lockId = lockId;
+    LOG.debug("Reporting to listener {}", listener);
+  }
+
+  @Override
+  public void run() {
+    try {
+      // I'm assuming that there is no transaction ID for a read lock.
+      metaStoreClient.heartbeat(transactionId == null ? 0 : transactionId, lockId);
+      LOG.debug("Sent heartbeat for lock={}, transactionId={}", lockId, transactionId);
+    } catch (NoSuchLockException | NoSuchTxnException | TxnAbortedException e) {
+      failLock(e);
+    } catch (TException e) {
+      LOG.warn("Failed to send heartbeat to meta store.", e);
+    }
+  }
+
+  private void failLock(Exception e) {
+    LOG.debug("Lock " + lockId + " failed, cancelling heartbeat and notifiying listener: " + listener, e);
+    // Cancel the heartbeat
+    cancel();
+    listener.lockFailed(lockId, transactionId, Lock.asStrings(tableDescriptors), e);
+  }
+
+  @Override
+  public String toString() {
+    return "HeartbeatTimerTask [lockId=" + lockId + ", transactionId=" + transactionId + "]";
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
new file mode 100644
index 0000000..21604df
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
@@ -0,0 +1,282 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the state required to safely read/write from/to an ACID table.
+ */
+public class Lock {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Lock.class);
+
+  private static final double HEARTBEAT_FACTOR = 0.75;
+  private static final int DEFAULT_HEARTBEAT_PERIOD = 275;
+
+  private final IMetaStoreClient metaStoreClient;
+  private final HeartbeatFactory heartbeatFactory;
+  private final LockFailureListener listener;
+  private final Collection<Table> tableDescriptors;
+  private final int lockRetries;
+  private final int retryWaitSeconds;
+  private final String user;
+  private final HiveConf hiveConf;
+
+  private Timer heartbeat;
+  private Long lockId;
+  private Long transactionId;
+
+  public Lock(IMetaStoreClient metaStoreClient, Options options) {
+    this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user,
+        options.descriptors, options.lockRetries, options.retryWaitSeconds);
+  }
+
+  /** Visible for testing only. */
+  Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, HiveConf hiveConf,
+      LockFailureListener listener, String user, Collection<Table> tableDescriptors, int lockRetries,
+      int retryWaitSeconds) {
+    this.metaStoreClient = metaStoreClient;
+    this.heartbeatFactory = heartbeatFactory;
+    this.hiveConf = hiveConf;
+    this.user = user;
+    this.tableDescriptors = tableDescriptors;
+    this.listener = listener;
+    this.lockRetries = lockRetries;
+    this.retryWaitSeconds = retryWaitSeconds;
+
+    if (LockFailureListener.NULL_LISTENER.equals(listener)) {
+      LOG.warn("No {} supplied. Data quality and availability cannot be assured.",
+          LockFailureListener.class.getSimpleName());
+    }
+  }
+
+  /** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */
+  public void acquire() throws LockException {
+    lockId = internalAcquire(null);
+    initiateHeartbeat();
+  }
+
+  /** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */
+  public void acquire(long transactionId) throws LockException {
+    lockId = internalAcquire(transactionId);
+    this.transactionId = transactionId;
+    initiateHeartbeat();
+  }
+
+  /** Attempts to release the read lock on the table. Throws an exception if the lock failed at any point. */
+  public void release() throws LockException {
+    if (heartbeat != null) {
+      heartbeat.cancel();
+    }
+    internalRelease();
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public String toString() {
+    return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId
+        + "]";
+  }
+
+  private long internalAcquire(Long transactionId) throws LockException {
+    int attempts = 0;
+    LockRequest request = buildSharedLockRequest(transactionId);
+    do {
+      LockResponse response = null;
+      try {
+        response = metaStoreClient.lock(request);
+      } catch (TException e) {
+        throw new LockException("Unable to acquire lock for tables: [" + join(tableDescriptors) + "]", e);
+      }
+      if (response != null) {
+        LockState state = response.getState();
+        if (state == LockState.NOT_ACQUIRED || state == LockState.ABORT) {
+          // I expect we'll only see NOT_ACQUIRED here?
+          break;
+        }
+        if (state == LockState.ACQUIRED) {
+          LOG.debug("Acquired lock {}", response.getLockid());
+          return response.getLockid();
+        }
+        if (state == LockState.WAITING) {
+          try {
+            Thread.sleep(TimeUnit.SECONDS.toMillis(retryWaitSeconds));
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+      attempts++;
+    } while (attempts < lockRetries);
+    throw new LockException("Could not acquire lock on tables: [" + join(tableDescriptors) + "]");
+  }
+
+  private void internalRelease() {
+    try {
+      // if there is a transaction then this lock will be released on commit/abort/rollback instead.
+      if (lockId != null && transactionId == null) {
+        metaStoreClient.unlock(lockId);
+        LOG.debug("Released lock {}", lockId);
+        lockId = null;
+      }
+    } catch (TException e) {
+      LOG.error("Lock " + lockId + " failed.", e);
+      listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), e);
+    }
+  }
+
+  private LockRequest buildSharedLockRequest(Long transactionId) {
+    LockRequestBuilder requestBuilder = new LockRequestBuilder();
+    for (Table descriptor : tableDescriptors) {
+      LockComponent component = new LockComponentBuilder()
+          .setDbName(descriptor.getDbName())
+          .setTableName(descriptor.getTableName())
+          .setShared()
+          .build();
+      requestBuilder.addLockComponent(component);
+    }
+    if (transactionId != null) {
+      requestBuilder.setTransactionId(transactionId);
+    }
+    LockRequest request = requestBuilder.setUser(user).build();
+    return request;
+  }
+
+  private void initiateHeartbeat() {
+    int heartbeatPeriod = getHeartbeatPeriod();
+    LOG.debug("Heartbeat period {}s", heartbeatPeriod);
+    heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tableDescriptors, lockId,
+        heartbeatPeriod);
+  }
+
+  private int getHeartbeatPeriod() {
+    int heartbeatPeriod = DEFAULT_HEARTBEAT_PERIOD;
+    if (hiveConf != null) {
+      // This value is always in seconds and includes an 's' suffix.
+      String txTimeoutSeconds = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT);
+      if (txTimeoutSeconds != null) {
+        // We want to send the heartbeat at an interval that is less than the timeout.
+        heartbeatPeriod = Math.max(1,
+            (int) (Integer.parseInt(txTimeoutSeconds.substring(0, txTimeoutSeconds.length() - 1)) * HEARTBEAT_FACTOR));
+      }
+    }
+    return heartbeatPeriod;
+  }
+
+  /** Visible for testing only. */
+  Long getLockId() {
+    return lockId;
+  }
+
+  /** Visible for testing only. */
+  Long getTransactionId() {
+    return transactionId;
+  }
+
+  /** Visible for testing only. */
+  static String join(Iterable<? extends Object> values) {
+    return StringUtils.join(values, ",");
+  }
+
+  /** Visible for testing only. */
+  static List<String> asStrings(Collection<Table> tables) {
+    List<String> strings = new ArrayList<>(tables.size());
+    for (Table descriptor : tables) {
+      strings.add(descriptor.getDbName() + "." + descriptor.getTableName());
+    }
+    return strings;
+  }
+
+  /** Constructs a lock options for a set of Hive ACID tables from which we wish to read. */
+  public static final class Options {
+    Set<Table> descriptors = new LinkedHashSet<>();
+    LockFailureListener listener = LockFailureListener.NULL_LISTENER;
+    int lockRetries = 5;
+    int retryWaitSeconds = 30;
+    String user;
+    HiveConf hiveConf;
+
+    /** Adds a table for which a shared read lock will be requested. */
+    public Options addTable(String databaseName, String tableName) {
+      checkNotNullOrEmpty(databaseName);
+      checkNotNullOrEmpty(tableName);
+      Table table = new Table();
+      table.setDbName(databaseName);
+      table.setTableName(tableName);
+      descriptors.add(table);
+      return this;
+    }
+
+    public Options user(String user) {
+      checkNotNullOrEmpty(user);
+      this.user = user;
+      return this;
+    }
+
+    public Options configuration(HiveConf hiveConf) {
+      checkNotNull(hiveConf);
+      this.hiveConf = hiveConf;
+      return this;
+    }
+
+    /** Sets a listener to handle failures of locks that were previously acquired. */
+    public Options lockFailureListener(LockFailureListener listener) {
+      checkNotNull(listener);
+      this.listener = listener;
+      return this;
+    }
+
+    public Options lockRetries(int lockRetries) {
+      checkArgument(lockRetries > 0);
+      this.lockRetries = lockRetries;
+      return this;
+    }
+
+    public Options retryWaitSeconds(int retryWaitSeconds) {
+      checkArgument(retryWaitSeconds > 0);
+      this.retryWaitSeconds = retryWaitSeconds;
+      return this;
+    }
+
+    private static void checkArgument(boolean value) {
+      if (!value) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+    private static void checkNotNull(Object value) {
+      if (value == null) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+    private static void checkNotNullOrEmpty(String value) {
+      if (StringUtils.isBlank(value)) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
new file mode 100644
index 0000000..67ed601
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+public class LockException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public LockException(String message) {
+    super(message);
+  }
+
+  public LockException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
new file mode 100644
index 0000000..2b6a12a
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
@@ -0,0 +1,26 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Provides a means to handle the situation when a held lock fails. */
+public interface LockFailureListener {
+
+  static final Logger LOG = LoggerFactory.getLogger(LockFailureListener.class);
+
+  static final LockFailureListener NULL_LISTENER = new LockFailureListener() {
+    @Override
+    public void lockFailed(long lockId, Long transactionId, Iterable<String> tableNames, Throwable t) {
+      LOG.warn(
+          "Ignored lock failure: lockId=" + lockId + ", transactionId=" + transactionId + ", tables=" + tableNames, t);
+    }
+    
+    public String toString() {
+      return LockFailureListener.class.getName() + ".NULL_LISTENER";
+    }
+  };
+
+  /** Called when the specified lock has failed. You should probably abort your job in this case. */
+  void lockFailed(long lockId, Long transactionId, Iterable<String> tableNames, Throwable t);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
new file mode 100644
index 0000000..79c30e7
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
@@ -0,0 +1,27 @@
+digraph "API Usage" {
+  nodesep=1.2;
+
+  DATA [label="ACID\ndataset",shape=oval,style=filled,color="gray"];
+  CHANGES [label="Changed\ndata",shape=oval,style=filled,color="gray"];
+  
+  META_STORE [label="Hive\nMetaStore",shape=box,style=filled,color="darkseagreen3"];
+  HIVE_CLI [label="Hive\nCLI",shape=box,style=filled,color="darkseagreen3"];	
+
+  MERGE1 [label="Compute\nmutations\n(your code)",shape=box,style=filled,color="khaki1"];
+  SORT [label="Group\n& sort\n(your code)",shape=box,style=filled,color="khaki1"];
+  CLIENT [label="Mutator\nclient",shape=box,style=filled,color="lightblue"];
+  BUCKET [label="Bucket ID\nappender",shape=box,style=filled,color="lightblue"];
+  COORD [label="Mutator\ncoordinator",shape=box,style=filled,color="lightblue"]; 
+  CLIENT -> COORD [label="Provides\nconf to"];
+  CLIENT -> BUCKET [label="Provides\nconf to"];
+  
+  CLIENT -> META_STORE [label="Manages\ntxns using"];
+  CHANGES -> MERGE1 [label="Reads ∆s\nfrom"];
+  DATA -> MERGE1 [label="Reads\nROW__IDs\nfrom"];
+  BUCKET -> MERGE1 [label="Appends ids\nto inserts"];
+  MERGE1 -> SORT;
+  SORT -> COORD [label="Issues\nmutations to"];
+  COORD -> DATA [label="Writes to"];
+  DATA -> HIVE_CLI [label="Read by"];
+  META_STORE -> DATA [label="Compacts"]; 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
new file mode 100644
index 0000000..9fc10b6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -0,0 +1,495 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+        "http://www.w3.org/TR/html4/loose.dtd">
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming Mutation API">
+<meta name=Keywords content="HCatalog Streaming Mutation ACID">
+<meta http-equiv=Content-Type content="text/html; charset=utf-8">
+<title>HCatalog Streaming Mutation API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming Mutation API -- high level description</h1>
+
+<h2>Background</h2>
+<p>
+In certain data processing use cases it is necessary to modify existing
+data when new facts arrive. An example of this is the classic ETL merge
+where a copy of a data set is kept in sync with a master by the frequent
+application of deltas. The deltas describe the mutations (inserts,
+updates, deletes) that have occurred to the master since the previous
+sync. To implement such a case using Hadoop traditionally demands that
+the partitions containing records targeted by the mutations be
+rewritten. This is a coarse approach; a partition containing millions of
+records might be rebuilt because of a single record change. Additionally
+these partitions cannot be restated atomically; at some point the old
+partition data must be swapped with the new partition data. When this
+swap occurs, usually by issuing an HDFS
+<code>rm</code>
+followed by a
+<code>mv</code>
+, the possibility exists where the data appears to be unavailable and
+hence any downstream jobs consuming the data might unexpectedly fail.
+Therefore data processing patterns that restate raw data on HDFS cannot
+operate robustly without some external mechanism to orchestrate
+concurrent access to changing data.
+</p>
+
+<p>
+The availability of ACID tables in Hive provides a mechanism that both
+enables concurrent access to data stored in HDFS (so long as it's in the
+ORC+ACID format), and also permits row level mutations or records within
+a table, without the need to rewrite the existing data. But while Hive
+itself supports
+<code>INSERT</code>
+,
+<code>UPDATE</code>
+and
+<code>DELETE</code>
+commands, and the ORC format can support large batches of mutations in a
+transaction, Hive's execution engine currently submits each individual
+mutation operation in a separate transaction and issues table scans (M/R
+jobs) to execute them. It does not currently scale to the demands of
+processing large deltas in an atomic manner. Furthermore it would be
+advantageous to extend atomic batch mutation capabilities beyond Hive by
+making them available to other data processing frameworks. The Streaming
+Mutation API does just this.
+</p>
+
+<p>The Streaming Mutation API, although similar to the Streaming
+API, has a number of differences and are built to enable very different
+use cases. Superficially, the Streaming API can only write new data
+whereas the mutation API can also modify existing data. However the two
+APIs also based on very different transaction models. The Streaming API
+focuses on surfacing a continuous stream of new data into a Hive table
+and does so by batching small sets of writes into multiple short-lived
+transactions. Conversely the mutation API is designed to infrequently
+apply large sets of mutations to a data set in an atomic fashion; all
+mutations will either be applied or they will not. This instead mandates
+the use of a single long-lived transaction. This table summarises the
+attributes of each API:</p>
+
+<table border="1">
+<thead>
+<tr>
+<th>Attribute</th>
+<th>Streaming API</th>
+<th>Mutation API</th>
+</tr>
+<tr>
+<td>Ingest type</td>
+<td>Data arrives continuously</td>
+<td>Ingests are performed periodically and the mutations are
+applied in a single batch</td>
+</tr>
+<tr>
+<td>Transaction scope</td>
+<td>Transactions are created for small batches of writes</td>
+<td>The entire set of mutations should be applied within a single
+transaction</td>
+</tr>
+<tr>
+<td>Data availability</td>
+<td>Surfaces new data to users frequently and quickly</td>
+<td>Change sets should be applied atomically, either the effect of
+the delta is visible or it is not</td>
+</tr>
+<tr>
+<td>Sensitive to record order</td>
+<td>No, records do not have pre-existing lastTxnIds or bucketIds.
+Records are likely being written into a single partition (today's date
+for example)</td>
+<td>Yes, all mutated records have existing <code>RecordIdentifiers</code>
+and must be grouped by (partitionValues, bucketId) and sorted by
+lastTxnId. These record coordinates initially arrive in an order that is
+effectively random.
+</td>
+</tr>
+<tr>
+<td>Impact of a write failure</td>
+<td>Transaction can be aborted and producer can choose to resubmit
+failed records as ordering is not important.</td>
+<td>Ingest for the respective must be halted and failed records
+resubmitted to preserve sequence.</td>
+</tr>
+<tr>
+<td>User perception of missing data</td>
+<td>Data has not arrived yet → "latency?"</td>
+<td>"This data is inconsistent, some records have been updated, but
+other related records have not" - consider here the classic transfer
+between bank accounts scenario</td>
+</tr>
+<tr>
+<td>API end point scope</td>
+<td>A given <code>HiveEndPoint</code> instance submits many
+transactions to a specific bucket, in a specific partition, of a
+specific table
+</td>
+<td>A set of<code>MutationCoordinators</code> write changes to
+unknown set of buckets, of an unknown set of partitions, of specific
+tables (can be more than one), within a single transaction.
+</td>
+</tr>
+</thead>
+</table>
+
+<h2>Structure</h2>
+<p>The API comprises two main concerns: transaction management, and
+the writing of mutation operations to the data set. The two concerns
+have a minimal coupling as it is expected that transactions will be
+initiated from a single job launcher type processes while the writing of
+mutations will be scaled out across any number of worker nodes. In the
+context of Hadoop M/R these can be more concretely defined as the Tool
+and Map/Reduce task components. However, use of this architecture is not
+mandated and in fact both concerns could be handled within a single
+simple process depending on the requirements.</p>
+
+<p>Note that a suitably configured Hive instance is required to
+operate this system even if you do not intend to access the data from
+within Hive. Internally, transactions are managed by the Hive MetaStore.
+Mutations are performed to HDFS via ORC APIs that bypass the MetaStore.
+Additionally you may wish to configure your MetaStore instance to
+perform periodic data compactions.</p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the <b>org.apache.hive.hcatalog.streaming.mutate</b>
+Java package and included as the hive-hcatalog-streaming jar.
+</p>
+
+<h2>Data requirements</h2>
+<p>
+Generally speaking, to apply a mutation to a record one must have some
+unique key that identifies the record. However, primary keys are not a
+construct provided by Hive. Internally Hive uses
+<code>RecordIdentifiers</code>
+stored in a virtual
+<code>ROW__ID</code>
+column to uniquely identified records within an ACID table. Therefore,
+any process that wishes to issue mutations to a table via this API must
+have available the corresponding row ids for the target records. What
+this means in practice is that the process issuing mutations must first
+read in a current snapshot the data and then join the mutations on some
+domain specific primary key to obtain the corresponding Hive
+<code>ROW__ID</code>
+. This is effectively what occurs within Hive's table scan process when
+an
+<code>UPDATE</code>
+or
+<code>DELETE</code>
+statement is executed. The
+<code>AcidInputFormat</code>
+provides access to this data via
+<code>AcidRecordReader.getRecordIdentifier()</code>
+.
+</p>
+
+<p>
+The implementation of the ACID format places some constraints on the
+order in which records are written and it is important that this
+ordering is enforced. Additionally, data must be grouped appropriately
+to adhere to the constraints imposed be the
+<code>OrcRecordUpdater</code>
+. Grouping also makes it possible parallelise the writing of mutations
+for the purposes of scaling. Finally, to correctly bucket new records
+(inserts) there is a slightly unintuitive trick that must be applied.
+</p>
+
+<p>All of these data sequencing concerns are the responsibility of
+the client process calling the API which is assumed to have first class
+grouping and sorting capabilities (Hadoop Map/Reduce etc.) The streaming
+API provides nothing more than validators that fail fast when they
+encounter groups and records that are out of sequence.</p>
+
+<p>In short, API client processes should prepare data for the mutate
+API like so:</p>
+<ul>
+<li><b>MUST:</b> Order records by <code>ROW__ID.originalTxn</code>,
+then <code>ROW__ID.rowId</code>.</li>
+<li><b>MUST:</b> Assign a <code>ROW__ID</code> containing a
+computed <code>bucketId</code> to records to be inserted.</li>
+<li><b>SHOULD:</b> Group/partition by table partition value, then <code>ROW__ID.bucketId</code>.</li>
+</ul>
+
+<p>
+The addition of a bucket ids to insert records prior to grouping and
+sorting seems unintuitive. However, it is required both to ensure
+adequate partitioning of new data and bucket allocation consistent with
+that provided by Hive. In a typical ETL the majority of mutation events
+are inserts, often targeting a single partition (new data for the
+previous day, hour, etc.) If more that one worker is writing said
+events, were we to leave the bucket id empty then all inserts would go
+to a single worker (e.g: reducer) and the workload could be heavily
+skewed. The assignment of a computed bucket allows inserts to be more
+usefully distributed across workers. Additionally, when Hive is working
+with the data it may expect records to have been bucketed in a way that
+is consistent with it's own internal scheme. A convenience type and
+method is provided to more easily compute and append bucket ids:
+<code>BucketIdResolver</code>
+and
+<code>BucketIdResolverImpl</code>
+.
+</p>
+
+<p>Update operations should not attempt to modify values of
+partition or bucketing columns. The API does not prevent this and such
+attempts could lead to data corruption.</p>
+
+<h2>Streaming requirements</h2>
+<p>A few things are currently required to use streaming.</p>
+
+<p>
+<ol>
+<li>Currently, only ORC storage format is supported. So '<b>stored
+as orc</b>' must be specified during table creation.
+</li>
+<li>The hive table must be bucketed, but not sorted. So something
+like '<b>clustered by (<i>colName</i>) into <i>10</i> buckets
+</b>' must be specified during table creation.
+</li>
+<li>User of the client streaming process must have the necessary
+permissions to write to the table or partition and create partitions in
+the table.</li>
+<li>Settings required in hive-site.xml for Metastore:
+<ol>
+<li><b>hive.txn.manager =
+org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+<li><b>hive.support.concurrency = true </b></li>
+<li><b>hive.compactor.initiator.on = true</b></li>
+<li><b>hive.compactor.worker.threads > 0 </b></li>
+</ol>
+</li>
+</ol>
+</p>
+
+<p>
+<b>Note:</b> Streaming mutations to <b>unpartitioned</b> tables is also
+supported.
+</p>
+
+<h2>Record layout</h2>
+<p>
+The structure, layout, and encoding of records is the exclusive concern
+of the client ETL mutation process and may be quite different from the
+target Hive ACID table. The mutation API requires concrete
+implementations of the
+<code>MutatorFactory</code>
+and
+<code>Mutator</code>
+classes to extract pertinent data from records and serialize data into
+the ACID files. Fortunately base classes are provided (
+<code>AbstractMutator</code>
+,
+<code>RecordInspectorImpl</code>
+) to simplify this effort and usually all that is required is the
+specification of a suitable
+<code>ObjectInspector</code>
+and the provision of the indexes of the
+<code>ROW__ID</code>
+and bucketed columns within the record structure. Note that all column
+indexes in these classes are with respect to your record structure, not
+the Hive table structure.
+</p>
+<p>
+You will likely also want to use a
+<code>BucketIdResolver</code>
+to append bucket ids to new records for insertion. Fortunately the core
+implementation is provided in
+<code>BucketIdResolverImpl</code>
+but note that bucket column indexes must be presented in the same order
+as they are in the Hive table definition to ensure consistent bucketing.
+Note that you cannot move records between buckets and an exception will
+be thrown if you attempt to do so. In real terms this mean that you
+should not attempt to modify the values in bucket columns with an
+<code>UPDATE</code>
+.
+</p>
+
+<h2>Connection and Transaction management</h2>
+<p>
+The
+<code>MutatorClient</code>
+class is used to create and manage transactions in which mutations can
+be performed. The scope of a transaction can extend across multiple ACID
+tables. When a client connects it communicates with the meta store to
+verify and acquire meta data for the target tables. An invocation of
+<code>newTransaction</code>
+then opens a transaction with the meta store, finalizes a collection of
+<code>AcidTables</code>
+and returns a new
+<code>Transaction</code>
+instance. The acid tables are light-weight, serializable objects that
+are used by the mutation writing components of the API to target
+specific ACID file locations. Usually your
+<code>MutatorClient</code>
+will be running on some master node and your coordinators on worker
+nodes. In this event the
+<code>AcidTableSerializer</code>
+can be used to encode the tables in a more transportable form, for use
+as a
+<code>Configuration</code>
+property for example.
+</p>
+<p>
+As you would expect, a
+<code>Transaction</code>
+must be initiated with a call to
+<code>begin</code>
+before any mutations can be applied. This invocation acquires a lock on
+the targeted tables using the meta store, and initiates a heartbeat to
+prevent transaction timeouts. It is highly recommended that you register
+a
+<code>LockFailureListener</code>
+with the client so that your process can handle any lock or transaction
+failures. Typically you may wish to abort the job in the event of such
+an error. With the transaction in place you can now start streaming
+mutations with one or more
+<code>MutatorCoordinator</code>
+instances (more on this later), can can finally
+<code>commit</code>
+or
+<code>abort</code>
+the transaction when the change set has been applied, which will release
+the lock with the meta store client. Finally you should
+<code>close</code>
+the mutation client to release any held resources.
+</p>
+<p>
+The
+<code>MutatorClientBuilder</code>
+is provided to simplify the construction of clients.
+</p>
+
+<p>
+<b>WARNING:</b> Hive doesn't currently have a deadlock detector (it is
+being worked on as part of <a
+href="https://issues.apache.org/jira/browse/HIVE-9675">HIVE-9675</a>).
+This API could potentially deadlock with other stream writers or with
+SQL users.
+</p>
+<h2>Writing data</h2>
+
+<p>
+The
+<code>MutatorCoordinator</code>
+class is used to issue mutations to an ACID table. You will require at
+least one instance per table participating in the transaction. The
+target of a given instance is defined by the respective
+<code>AcidTable</code>
+used to construct the coordinator. It is recommended that a
+<code>MutatorClientBuilder</code>
+is used to simplify the construction process.
+</p>
+
+<p>
+Mutations can be applied by invoking the respective
+<code>insert</code>
+,
+<code>update</code>
+, and
+<code>delete</code>
+methods on the coordinator. These methods each take as parameters the
+target partition of the record and the mutated record. In the case of an
+unpartitioned table you should simply pass an empty list as the
+partition value. For inserts specifically, only the bucket id will be
+extracted from the
+<code>RecordIdentifier</code>
+, the transactionId and rowId will be ignored and replaced by
+appropriate values in the
+<code>RecordUpdater</code>
+. Additionally, in the case of deletes, everything but the
+<code>RecordIdentifier</code>
+in the record will be ignored and therefore it is often easier to simply
+submit the original record.
+</p>
+
+<p>
+<b>Caution:</b> As mentioned previously, mutations must arrive in
+specific order for the resultant table data to be consistent.
+Coordinators will verify a naturally ordered sequence of
+(lastTransactionId, rowId) and will throw an exception if this sequence
+is broken. This exception should almost certainly be escalated so that
+the transaction is aborted. This, along with the correct ordering of the
+data, is the responsibility of the client using the API.
+</p>
+
+<h3>Dynamic Partition Creation:</h3>
+It is very likely to be desirable to have new partitions created
+automatically (say on a hourly basis). In such cases requiring the Hive
+admin to pre-create the necessary partitions may not be reasonable.
+Consequently the API allows coordinators to create partitions as needed
+(see:
+<code>MutatorClientBuilder.addTable(String, String, boolean)</code>
+). Partition creation being an atomic action, multiple coordinators can
+race to create the partition, but only one would succeed, so
+coordinators clients need not synchronize when creating a partition. The
+user of the coordinator process needs to be given write permissions on
+the Hive table in order to create partitions.
+
+<h2>Reading data</h2>
+
+<p>
+Although this API is concerned with writing changes to data, as
+previously stated we'll almost certainly have to read the existing data
+first to obtain the relevant
+<code>ROW_IDs</code>
+. Therefore it is worth noting that reading ACID data in a robust and
+consistent manner requires the following:
+<ol>
+<li>Obtaining a valid transaction list from the meta store (<code>ValidTxnList</code>).
+</li>
+<li>Acquiring a read-lock with the meta store and issuing
+heartbeats (<code>LockImpl</code> can help with this).
+</li>
+<li>Configuring the <code>OrcInputFormat</code> and then reading
+the data. Make sure that you also pull in the <code>ROW__ID</code>
+values. See: <code>AcidRecordReader.getRecordIdentifier</code>.
+</li>
+<li>Releasing the read-lock.</li>
+</ol>
+</p>
+
+<h2>Example</h2>
+<p>
+<img src="doc-files/system-overview.png" />
+</p>
+<p>So to recap, the sequence of events required to apply mutations
+to a dataset using the API is:</p>
+<ol>
+<li>Create a <code>MutatorClient</code> to manage a transaction for
+the targeted ACID tables. This set of tables should include any
+transactional destinations or sources. Don't forget to register a <code>LockFailureListener</code>
+so that you can handle transaction failures.
+</li>
+<li>Open a new <code>Transaction</code> with the client.
+</li>
+<li>Get the <code>AcidTables</code> from the client.
+</li>
+<li>Begin the transaction.</li>
+<li>Create at least one <code>MutatorCoordinator</code> for each
+table. The <code>AcidTableSerializer</code> can help you transport the <code>AcidTables</code>
+when your workers are in a distributed environment.
+</li>
+<li>Compute your mutation set (this is your ETL merge process).</li>
+<li>Append bucket ids to insertion records. A <code>BucketIdResolver</code>
+can help here.
+</li>
+<li>Group and sort your data appropriately.</li>
+<li>Issue mutation events to your coordinators.</li>
+<li>Close your coordinators.</li>
+<li>Abort or commit the transaction.</li>
+<li>Close your mutation client.</li>
+</ol>
+<p>
+See
+<code>ExampleUseCase</code>
+and
+<code>TestMutations.testUpdatesAndDeletes()</code>
+for some very simple usages.
+</p>
+
+</body>
+
+</html>

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
new file mode 100644
index 0000000..656324c
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class BucketIdException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  BucketIdException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
new file mode 100644
index 0000000..dab2072
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+/** Computes and appends bucket ids to records that are due to be inserted. */
+public interface BucketIdResolver {
+
+  Object attachBucketIdToRecord(Object record);
+
+  /** See: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
+  int computeBucketId(Object record);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
new file mode 100644
index 0000000..dbed9e1
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -0,0 +1,76 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * Implementation of a {@link BucketIdResolver} that includes the logic required to calculate a bucket id from a record
+ * that is consistent with Hive's own internal computation scheme.
+ */
+public class BucketIdResolverImpl implements BucketIdResolver {
+
+  private static final long INVALID_TRANSACTION_ID = -1L;
+  private static final long INVALID_ROW_ID = -1L;
+
+  private final SettableStructObjectInspector structObjectInspector;
+  private final StructField[] bucketFields;
+  private final int totalBuckets;
+  private final StructField recordIdentifierField;
+
+  /**
+   * Note that all column indexes are with respect to your record structure, not the Hive table structure. Bucket column
+   * indexes must be presented in the same order as they are in the Hive table definition.
+   */
+  public BucketIdResolverImpl(ObjectInspector objectInspector, int recordIdColumn, int totalBuckets, int[] bucketColumns) {
+    this.totalBuckets = totalBuckets;
+    if (!(objectInspector instanceof SettableStructObjectInspector)) {
+      throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a "
+          + objectInspector.getClass().getName());
+    }
+
+    if (bucketColumns.length < 1) {
+      throw new IllegalArgumentException("No bucket column indexes set.");
+    }
+    structObjectInspector = (SettableStructObjectInspector) objectInspector;
+    List<? extends StructField> structFields = structObjectInspector.getAllStructFieldRefs();
+
+    recordIdentifierField = structFields.get(recordIdColumn);
+
+    bucketFields = new StructField[bucketColumns.length];
+    for (int i = 0; i < bucketColumns.length; i++) {
+      int bucketColumnsIndex = bucketColumns[i];
+      bucketFields[i] = structFields.get(bucketColumnsIndex);
+    }
+  }
+
+  @Override
+  public Object attachBucketIdToRecord(Object record) {
+    int bucketId = computeBucketId(record);
+    RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID);
+    structObjectInspector.setStructFieldData(record, recordIdentifierField, recordIdentifier);
+    return record;
+  }
+
+  /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
+  @Override
+  public int computeBucketId(Object record) {
+    int bucketId = 1;
+
+    for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) {
+      Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
+      bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector());
+    }
+
+    if (bucketId < 0) {
+      bucketId = -1 * bucketId;
+    }
+
+    return bucketId % totalBuckets;
+  }
+
+}


[2/3] hive git commit: HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates)

Posted by ga...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
new file mode 100644
index 0000000..9aab346
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
@@ -0,0 +1,83 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */
+class CreatePartitionHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final String databaseName;
+  private final String tableName;
+
+  CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) {
+    this.metaStoreClient = metaStoreClient;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+  }
+
+  /** Returns the expected {@link Path} for a given partition value. */
+  Path getPathForPartition(List<String> newPartitionValues) throws WorkerException {
+    try {
+      String location;
+      if (newPartitionValues.isEmpty()) {
+        location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation();
+      } else {
+        location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation();
+      }
+      LOG.debug("Found path {} for partition {}", location, newPartitionValues);
+      return new Path(location);
+    } catch (NoSuchObjectException e) {
+      throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e);
+    } catch (TException e) {
+      throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
+          + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
+    }
+  }
+
+  /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
+  void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
+    if (newPartitionValues.isEmpty()) {
+      return;
+    }
+
+    try {
+      LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
+      Table table = metaStoreClient.getTable(databaseName, tableName);
+
+      Partition partition = new Partition();
+      partition.setDbName(table.getDbName());
+      partition.setTableName(table.getTableName());
+      StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+      partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+          + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
+      partition.setSd(partitionSd);
+      partition.setValues(newPartitionValues);
+
+      metaStoreClient.add_partition(partition);
+    } catch (AlreadyExistsException e) {
+      LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
+    } catch (NoSuchObjectException e) {
+      LOG.error("Failed to create partition : " + newPartitionValues, e);
+      throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e);
+    } catch (TException e) {
+      LOG.error("Failed to create partition : " + newPartitionValues, e);
+      throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '"
+          + databaseName + "." + tableName + "'", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java
new file mode 100644
index 0000000..f8e46d6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class GroupRevisitedException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  GroupRevisitedException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java
new file mode 100644
index 0000000..8ae3904
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java
@@ -0,0 +1,74 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Tracks the (partition, bucket) combinations that have been encountered, checking that a group is not revisited.
+ * Potentially memory intensive.
+ */
+class GroupingValidator {
+
+  private final Map<String, Set<Integer>> visited;
+  private final StringBuffer partitionKeyBuilder;
+  private long groups;
+  private String lastPartitionKey;
+  private int lastBucketId = -1;
+
+  GroupingValidator() {
+    visited = new HashMap<String, Set<Integer>>();
+    partitionKeyBuilder = new StringBuffer(64);
+  }
+
+  /**
+   * Checks that this group is either the same as the last or is a new group.
+   */
+  boolean isInSequence(List<String> partitionValues, int bucketId) {
+    String partitionKey = getPartitionKey(partitionValues);
+    if (Objects.equals(lastPartitionKey, partitionKey) && lastBucketId == bucketId) {
+      return true;
+    }
+    lastPartitionKey = partitionKey;
+    lastBucketId = bucketId;
+
+    Set<Integer> bucketIdSet = visited.get(partitionKey);
+    if (bucketIdSet == null) {
+      // If the bucket id set component of this data structure proves to be too large there is the
+      // option of moving it to Trove or HPPC in an effort to reduce size.
+      bucketIdSet = new HashSet<>();
+      visited.put(partitionKey, bucketIdSet);
+    }
+
+    boolean newGroup = bucketIdSet.add(bucketId);
+    if (newGroup) {
+      groups++;
+    }
+    return newGroup;
+  }
+
+  private String getPartitionKey(List<String> partitionValues) {
+    partitionKeyBuilder.setLength(0);
+    boolean first = true;
+    for (String element : partitionValues) {
+      if (first) {
+        first = false;
+      } else {
+        partitionKeyBuilder.append('/');
+      }
+      partitionKeyBuilder.append(element);
+    }
+    String partitionKey = partitionKeyBuilder.toString();
+    return partitionKey;
+  }
+
+  @Override
+  public String toString() {
+    return "GroupingValidator [groups=" + groups + ",lastPartitionKey=" + lastPartitionKey + ",lastBucketId="
+        + lastBucketId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
new file mode 100644
index 0000000..96ecce9
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java
@@ -0,0 +1,21 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+/**
+ * Interface for submitting mutation events to a given partition and bucket in an ACID table. Requires records to arrive
+ * in the order defined by the {@link SequenceValidator}.
+ */
+public interface Mutator extends Closeable, Flushable {
+
+  void insert(Object record) throws IOException;
+
+  void update(Object record) throws IOException;
+
+  void delete(Object record) throws IOException;
+
+  void flush() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
new file mode 100644
index 0000000..96f05e5
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -0,0 +1,281 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Orchestrates the application of an ordered sequence of mutation events to a given ACID table. Events must be grouped
+ * by partition, then bucket and ordered by origTxnId, then rowId. Ordering is enforced by the {@link SequenceValidator}
+ * and grouping is by the {@link GroupingValidator}. An acid delta file is created for each combination partition, and
+ * bucket id (a single transaction id is implied). Once a delta file has been closed it cannot be reopened. Therefore
+ * care is needed as to group the data correctly otherwise failures will occur if a delta belonging to group has been
+ * previously closed. The {@link MutatorCoordinator} will seamlessly handle transitions between groups, creating and
+ * closing {@link Mutator Mutators} as needed to write to the appropriate partition and bucket. New partitions will be
+ * created in the meta store if {@link AcidTable#createPartitions()} is set.
+ * <p/>
+ * {@link #insert(List, Object) Insert} events must be artificially assigned appropriate bucket ids in the preceding
+ * grouping phase so that they are grouped correctly. Note that any transaction id or row id assigned to the
+ * {@link RecordIdentifier RecordIdentifier} of such events will be ignored by both the coordinator and the underlying
+ * {@link RecordUpdater}.
+ */
+public class MutatorCoordinator implements Closeable, Flushable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final MutatorFactory mutatorFactory;
+  private final GroupingValidator groupingValidator;
+  private final SequenceValidator sequenceValidator;
+  private final AcidTable table;
+  private final RecordInspector recordInspector;
+  private final CreatePartitionHelper partitionHelper;
+  private final AcidOutputFormat<?, ?> outputFormat;
+  private final BucketIdResolver bucketIdResolver;
+  private final HiveConf configuration;
+  private final boolean deleteDeltaIfExists;
+
+  private int bucketId;
+  private List<String> partitionValues;
+  private Path partitionPath;
+  private Mutator mutator;
+
+  MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+      AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
+    this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient,
+        table.getDatabaseName(), table.getTableName()), new GroupingValidator(), new SequenceValidator(), table,
+        deleteDeltaIfExists);
+  }
+
+  /** Visible for testing only. */
+  MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+      CreatePartitionHelper partitionHelper, GroupingValidator groupingValidator, SequenceValidator sequenceValidator,
+      AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
+    this.metaStoreClient = metaStoreClient;
+    this.configuration = configuration;
+    this.mutatorFactory = mutatorFactory;
+    this.partitionHelper = partitionHelper;
+    this.groupingValidator = groupingValidator;
+    this.sequenceValidator = sequenceValidator;
+    this.table = table;
+    this.deleteDeltaIfExists = deleteDeltaIfExists;
+    this.recordInspector = this.mutatorFactory.newRecordInspector();
+    bucketIdResolver = this.mutatorFactory.newBucketIdResolver(table.getTotalBuckets());
+
+    bucketId = -1;
+    outputFormat = createOutputFormat(table.getOutputFormatName(), configuration);
+  }
+
+  /**
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * 
+   * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
+   *           using the values in the record's bucketed columns.
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   *           sequence.
+   * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
+   *           been closed.
+   * @throws PartitionCreationException Could not create a new partition in the meta store.
+   * @throws WorkerException
+   */
+  public void insert(List<String> partitionValues, Object record) throws WorkerException {
+    reconfigureState(OperationType.INSERT, partitionValues, record);
+    try {
+      mutator.insert(record);
+      LOG.debug("Inserted into partition={}, record={}", partitionValues, record);
+    } catch (IOException e) {
+      throw new WorkerException("Failed to insert record '" + record + " using mutator '" + mutator + "'.", e);
+    }
+  }
+
+  /**
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * 
+   * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
+   *           using the values in the record's bucketed columns.
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   *           sequence.
+   * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
+   *           been closed.
+   * @throws PartitionCreationException Could not create a new partition in the meta store.
+   * @throws WorkerException
+   */
+  public void update(List<String> partitionValues, Object record) throws WorkerException {
+    reconfigureState(OperationType.UPDATE, partitionValues, record);
+    try {
+      mutator.update(record);
+      LOG.debug("Updated in partition={}, record={}", partitionValues, record);
+    } catch (IOException e) {
+      throw new WorkerException("Failed to update record '" + record + " using mutator '" + mutator + "'.", e);
+    }
+  }
+
+  /**
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * 
+   * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
+   *           using the values in the record's bucketed columns.
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   *           sequence.
+   * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
+   *           been closed.
+   * @throws PartitionCreationException Could not create a new partition in the meta store.
+   * @throws WorkerException
+   */
+  public void delete(List<String> partitionValues, Object record) throws WorkerException {
+    reconfigureState(OperationType.DELETE, partitionValues, record);
+    try {
+      mutator.delete(record);
+      LOG.debug("Deleted from partition={}, record={}", partitionValues, record);
+    } catch (IOException e) {
+      throw new WorkerException("Failed to delete record '" + record + " using mutator '" + mutator + "'.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (mutator != null) {
+        mutator.close();
+      }
+    } finally {
+      metaStoreClient.close();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (mutator != null) {
+      mutator.flush();
+    }
+  }
+
+  private void reconfigureState(OperationType operationType, List<String> newPartitionValues, Object record)
+    throws WorkerException {
+    RecordIdentifier newRecordIdentifier = extractRecordIdentifier(operationType, newPartitionValues, record);
+    int newBucketId = newRecordIdentifier.getBucketId();
+
+    if (newPartitionValues == null) {
+      newPartitionValues = Collections.emptyList();
+    }
+
+    try {
+      if (partitionHasChanged(newPartitionValues)) {
+        if (table.createPartitions()) {
+          partitionHelper.createPartitionIfNotExists(newPartitionValues);
+        }
+        Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues);
+        resetMutator(newBucketId, newPartitionValues, newPartitionPath);
+      } else if (bucketIdHasChanged(newBucketId)) {
+        resetMutator(newBucketId, partitionValues, partitionPath);
+      } else {
+        validateRecordSequence(operationType, newRecordIdentifier);
+      }
+    } catch (IOException e) {
+      throw new WorkerException("Failed to reset mutator when performing " + operationType + " of record: " + record, e);
+    }
+  }
+
+  private RecordIdentifier extractRecordIdentifier(OperationType operationType, List<String> newPartitionValues,
+      Object record) throws BucketIdException {
+    RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record);
+    int computedBucketId = bucketIdResolver.computeBucketId(record);
+    if (operationType != OperationType.DELETE && recordIdentifier.getBucketId() != computedBucketId) {
+      throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId
+          + ") for record " + recordIdentifier + " in partition " + newPartitionValues + ".");
+    }
+    return recordIdentifier;
+  }
+
+  private void resetMutator(int newBucketId, List<String> newPartitionValues, Path newPartitionPath)
+    throws IOException, GroupRevisitedException {
+    if (mutator != null) {
+      mutator.close();
+    }
+    validateGrouping(newPartitionValues, newBucketId);
+    sequenceValidator.reset();
+    if (deleteDeltaIfExists) {
+      // TODO: Should this be the concern of the mutator?
+      deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId);
+    }
+    mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId);
+    bucketId = newBucketId;
+    partitionValues = newPartitionValues;
+    partitionPath = newPartitionPath;
+    LOG.debug("Reset mutator: bucketId={}, partition={}, partitionPath={}", bucketId, partitionValues, partitionPath);
+  }
+
+  private boolean partitionHasChanged(List<String> newPartitionValues) {
+    boolean partitionHasChanged = !Objects.equals(this.partitionValues, newPartitionValues);
+    if (partitionHasChanged) {
+      LOG.debug("Partition changed from={}, to={}", this.partitionValues, newPartitionValues);
+    }
+    return partitionHasChanged;
+  }
+
+  private boolean bucketIdHasChanged(int newBucketId) {
+    boolean bucketIdHasChanged = this.bucketId != newBucketId;
+    if (bucketIdHasChanged) {
+      LOG.debug("Bucket ID changed from={}, to={}", this.bucketId, newBucketId);
+    }
+    return bucketIdHasChanged;
+  }
+
+  private void validateGrouping(List<String> newPartitionValues, int newBucketId) throws GroupRevisitedException {
+    if (!groupingValidator.isInSequence(newPartitionValues, bucketId)) {
+      throw new GroupRevisitedException("Group out of sequence: state=" + groupingValidator + ", partition="
+          + newPartitionValues + ", bucketId=" + newBucketId);
+    }
+  }
+
+  private void validateRecordSequence(OperationType operationType, RecordIdentifier newRecordIdentifier)
+    throws RecordSequenceException {
+    boolean identiferOutOfSequence = operationType != OperationType.INSERT
+        && !sequenceValidator.isInSequence(newRecordIdentifier);
+    if (identiferOutOfSequence) {
+      throw new RecordSequenceException("Records not in sequence: state=" + sequenceValidator + ", recordIdentifier="
+          + newRecordIdentifier);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private AcidOutputFormat<?, ?> createOutputFormat(String outputFormatName, HiveConf configuration)
+    throws WorkerException {
+    try {
+      return (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outputFormatName), configuration);
+    } catch (ClassNotFoundException e) {
+      throw new WorkerException("Could not locate class for '" + outputFormatName + "'.", e);
+    }
+  }
+
+  private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException {
+    Path deltaPath = AcidUtils.createFilename(partitionPath,
+        new AcidOutputFormat.Options(configuration)
+            .bucket(bucketId)
+            .minimumTransactionId(transactionId)
+            .maximumTransactionId(transactionId));
+    FileSystem fileSystem = deltaPath.getFileSystem(configuration);
+    if (fileSystem.exists(deltaPath)) {
+      LOG.info("Deleting existing delta path: {}", deltaPath);
+      fileSystem.delete(deltaPath, false);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
new file mode 100644
index 0000000..8851ea6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
@@ -0,0 +1,76 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory;
+import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+
+/** Convenience class for building {@link MutatorCoordinator} instances. */
+public class MutatorCoordinatorBuilder {
+
+  private HiveConf configuration;
+  private MutatorFactory mutatorFactory;
+  private UserGroupInformation authenticatedUser;
+  private String metaStoreUri;
+  private AcidTable table;
+  private boolean deleteDeltaIfExists;
+
+  public MutatorCoordinatorBuilder configuration(HiveConf configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
+  public MutatorCoordinatorBuilder authenticatedUser(UserGroupInformation authenticatedUser) {
+    this.authenticatedUser = authenticatedUser;
+    return this;
+  }
+
+  public MutatorCoordinatorBuilder metaStoreUri(String metaStoreUri) {
+    this.metaStoreUri = metaStoreUri;
+    return this;
+  }
+
+  /** Set the destination ACID table for this client. */
+  public MutatorCoordinatorBuilder table(AcidTable table) {
+    this.table = table;
+    return this;
+  }
+
+  /**
+   * If the delta file already exists, delete it. THis is useful in a MapReduce setting where a number of task retries
+   * will attempt to write the same delta file.
+   */
+  public MutatorCoordinatorBuilder deleteDeltaIfExists() {
+    this.deleteDeltaIfExists = true;
+    return this;
+  }
+
+  public MutatorCoordinatorBuilder mutatorFactory(MutatorFactory mutatorFactory) {
+    this.mutatorFactory = mutatorFactory;
+    return this;
+  }
+
+  public MutatorCoordinator build() throws WorkerException, MetaException {
+    String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
+    boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
+
+    configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri);
+
+    IMetaStoreClient metaStoreClient;
+    try {
+      metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode)
+          .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+    } catch (IOException e) {
+      throw new WorkerException("Could not create meta store client.", e);
+    }
+
+    return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, table, deleteDeltaIfExists);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
new file mode 100644
index 0000000..850054f
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
@@ -0,0 +1,16 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+
+public interface MutatorFactory {
+
+  Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException;
+  
+  RecordInspector newRecordInspector();
+  
+  BucketIdResolver newBucketIdResolver(int totalBuckets);
+  
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
new file mode 100644
index 0000000..0fe41d5
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
@@ -0,0 +1,84 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */
+public class MutatorImpl implements Mutator {
+
+  private final long transactionId;
+  private final Path partitionPath;
+  private final int bucketId;
+  private final Configuration configuration;
+  private final int recordIdColumn;
+  private final ObjectInspector objectInspector;
+  private RecordUpdater updater;
+
+  public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector,
+      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException {
+    this.configuration = configuration;
+    this.recordIdColumn = recordIdColumn;
+    this.objectInspector = objectInspector;
+    this.transactionId = transactionId;
+    this.partitionPath = partitionPath;
+    this.bucketId = bucketId;
+
+    updater = createRecordUpdater(outputFormat);
+  }
+
+  @Override
+  public void insert(Object record) throws IOException {
+    updater.insert(transactionId, record);
+  }
+
+  @Override
+  public void update(Object record) throws IOException {
+    updater.update(transactionId, record);
+  }
+
+  @Override
+  public void delete(Object record) throws IOException {
+    updater.delete(transactionId, record);
+  }
+
+  /**
+   * This implementation does intentionally nothing at this time. We only use a single transaction and
+   * {@link OrcRecordUpdater#flush()} will purposefully throw and exception in this instance. We keep this here in the
+   * event that we support multiple transactions and to make it clear that the omission of an invocation of
+   * {@link OrcRecordUpdater#flush()} was not a mistake.
+   */
+  @Override
+  public void flush() throws IOException {
+    // Intentionally do nothing
+  }
+
+  @Override
+  public void close() throws IOException {
+    updater.close(false);
+    updater = null;
+  }
+
+  @Override
+  public String toString() {
+    return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath
+        + ", bucketId=" + bucketId + "]";
+  }
+
+  protected RecordUpdater createRecordUpdater(AcidOutputFormat<?, ?> outputFormat) throws IOException {
+    return outputFormat.getRecordUpdater(
+        partitionPath,
+        new AcidOutputFormat.Options(configuration)
+            .inspector(objectInspector)
+            .bucket(bucketId)
+            .minimumTransactionId(transactionId)
+            .maximumTransactionId(transactionId)
+            .recordIdColumn(recordIdColumn));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
new file mode 100644
index 0000000..5ecb1bb
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java
@@ -0,0 +1,7 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+enum OperationType {
+  INSERT,
+  UPDATE,
+  DELETE;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java
new file mode 100644
index 0000000..5b59e01
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class PartitionCreationException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  PartitionCreationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  PartitionCreationException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
new file mode 100644
index 0000000..11ef0dd
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+
+/** Provide a means to extract {@link RecordIdentifier} from record objects. */
+public interface RecordInspector {
+
+  /** Get the {@link RecordIdentifier} from the record - to be used for updates and deletes only. */
+  RecordIdentifier extractRecordIdentifier(Object record);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
new file mode 100644
index 0000000..18ee458
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java
@@ -0,0 +1,45 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Standard {@link RecordInspector} implementation that uses the supplied {@link ObjectInspector} and
+ * {@link AcidOutputFormat.Options#recordIdColumn(int) record id column} to extract {@link RecordIdentifier
+ * RecordIdentifiers}, and calculate bucket ids from records.
+ */
+public class RecordInspectorImpl implements RecordInspector {
+
+  private final StructObjectInspector structObjectInspector;
+  private final StructField recordIdentifierField;
+
+  /**
+   * Note that all column indexes are with respect to your record structure, not the Hive table structure.
+   */
+  public RecordInspectorImpl(ObjectInspector objectInspector, int recordIdColumn) {
+    if (!(objectInspector instanceof StructObjectInspector)) {
+      throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a "
+          + objectInspector.getClass().getName());
+    }
+
+    structObjectInspector = (StructObjectInspector) objectInspector;
+    List<? extends StructField> structFields = structObjectInspector.getAllStructFieldRefs();
+    recordIdentifierField = structFields.get(recordIdColumn);
+  }
+
+  public RecordIdentifier extractRecordIdentifier(Object record) {
+    return (RecordIdentifier) structObjectInspector.getStructFieldData(record, recordIdentifierField);
+  }
+
+  @Override
+  public String toString() {
+    return "RecordInspectorImpl [structObjectInspector=" + structObjectInspector + ", recordIdentifierField="
+        + recordIdentifierField + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java
new file mode 100644
index 0000000..6b034f1
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class RecordSequenceException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  RecordSequenceException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
new file mode 100644
index 0000000..bcff4d6
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
@@ -0,0 +1,49 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Verifies that the sequence of {@link RecordIdentifier RecordIdentifiers} are in a valid order for insertion into an
+ * ACID delta file in a given partition and bucket.
+ */
+class SequenceValidator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class);
+
+  private Long lastTxId;
+  private Long lastRowId;
+
+  SequenceValidator() {
+  }
+
+  boolean isInSequence(RecordIdentifier recordIdentifier) {
+    if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) {
+      LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier);
+      return false;
+    } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null
+        && recordIdentifier.getRowId() <= lastRowId) {
+      LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier);
+      return false;
+    }
+    lastTxId = recordIdentifier.getTransactionId();
+    lastRowId = recordIdentifier.getRowId();
+    return true;
+  }
+
+  /**
+   * Validator must be reset for each new partition and or bucket.
+   */
+  void reset() {
+    lastTxId = null;
+    lastRowId = null;
+    LOG.debug("reset");
+  }
+
+  @Override
+  public String toString() {
+    return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java
new file mode 100644
index 0000000..1fa1998
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class WorkerException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  WorkerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  WorkerException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
new file mode 100644
index 0000000..86d70d4
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java
@@ -0,0 +1,82 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.util.List;
+
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+
+public class ExampleUseCase {
+
+  private String metaStoreUri;
+  private String databaseName;
+  private String tableName;
+  private boolean createPartitions = true;
+  private List<String> partitionValues1, partitionValues2, partitionValues3;
+  private Object record1, record2, record3;
+  private MutatorFactory mutatorFactory;
+
+  /* This is an illustration, not a functioning example. */ 
+  public void example() throws Exception {
+    // CLIENT/TOOL END
+    //
+    // Singleton instance in the job client
+
+    // Create a client to manage our transaction
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(databaseName, tableName, createPartitions)
+        .metaStoreUri(metaStoreUri)
+        .build();
+
+    // Get the transaction
+    Transaction transaction = client.newTransaction();
+
+    // Get serializable details of the destination tables
+    List<AcidTable> tables = client.getTables();
+
+    transaction.begin();
+
+    // CLUSTER / WORKER END
+    //
+    // Job submitted to the cluster
+    // 
+
+    BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(tables.get(0).getTotalBuckets());
+    record1 = bucketIdResolver.attachBucketIdToRecord(record1);
+
+    // --------------------------------------------------------------
+    // DATA SHOULD GET SORTED BY YOUR ETL/MERGE PROCESS HERE
+    //
+    // Group the data by (partitionValues, ROW__ID.bucketId)
+    // Order the groups by (ROW__ID.lastTransactionId, ROW__ID.rowId)
+    // --------------------------------------------------------------
+    
+    // One of these runs at the output of each reducer
+    //
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(tables.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+    
+    coordinator.insert(partitionValues1, record1);
+    coordinator.update(partitionValues2, record2);
+    coordinator.delete(partitionValues3, record3);
+
+    coordinator.close();
+
+    // CLIENT/TOOL END
+    //
+    // The tasks have completed, control is back at the tool
+
+    transaction.commit();
+
+    client.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
new file mode 100644
index 0000000..0d87a31
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.io.Text;
+
+public class MutableRecord {
+
+  // Column 0
+  public final int id;
+  // Column 1
+  public final Text msg;
+  // Column 2
+  public RecordIdentifier rowId;
+
+  public MutableRecord(int id, String msg, RecordIdentifier rowId) {
+    this.id = id;
+    this.msg = new Text(msg);
+    this.rowId = rowId;
+  }
+
+  public MutableRecord(int id, String msg) {
+    this.id = id;
+    this.msg = new Text(msg);
+    rowId = null;
+  }
+
+  @Override
+  public String toString() {
+    return "MutableRecord [id=" + id + ", msg=" + msg + ", rowId=" + rowId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
new file mode 100644
index 0000000..2a851c8
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
@@ -0,0 +1,51 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolverImpl;
+import org.apache.hive.hcatalog.streaming.mutate.worker.Mutator;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorImpl;
+import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspector;
+import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspectorImpl;
+
+public class ReflectiveMutatorFactory implements MutatorFactory {
+
+  private final int recordIdColumn;
+  private final ObjectInspector objectInspector;
+  private final Configuration configuration;
+  private final int[] bucketColumnIndexes;
+
+  public ReflectiveMutatorFactory(Configuration configuration, Class<?> recordClass, int recordIdColumn,
+      int[] bucketColumnIndexes) {
+    this.configuration = configuration;
+    this.recordIdColumn = recordIdColumn;
+    this.bucketColumnIndexes = bucketColumnIndexes;
+    objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(recordClass,
+        ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+  }
+
+  @Override
+  public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId)
+    throws IOException {
+    return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath,
+        bucketId);
+  }
+
+  @Override
+  public RecordInspector newRecordInspector() {
+    return new RecordInspectorImpl(objectInspector, recordIdColumn);
+  }
+
+  @Override
+  public BucketIdResolver newBucketIdResolver(int totalBuckets) {
+    return new BucketIdResolverImpl(objectInspector, recordIdColumn, totalBuckets, bucketColumnIndexes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
new file mode 100644
index 0000000..477ed8c
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+
+public class StreamingAssert {
+
+  public static class Factory {
+    private IMetaStoreClient metaStoreClient;
+    private final HiveConf conf;
+
+    public Factory(IMetaStoreClient metaStoreClient, HiveConf conf) {
+      this.metaStoreClient = metaStoreClient;
+      this.conf = conf;
+    }
+
+    public StreamingAssert newStreamingAssert(Table table) throws Exception {
+      return newStreamingAssert(table, Collections.<String> emptyList());
+    }
+
+    public StreamingAssert newStreamingAssert(Table table, List<String> partition) throws Exception {
+      return new StreamingAssert(metaStoreClient, conf, table, partition);
+    }
+  }
+
+  private Table table;
+  private List<String> partition;
+  private IMetaStoreClient metaStoreClient;
+  private Directory dir;
+  private ValidTxnList txns;
+  private List<AcidUtils.ParsedDelta> currentDeltas;
+  private long min;
+  private long max;
+  private Path partitionLocation;
+
+  StreamingAssert(IMetaStoreClient metaStoreClient, HiveConf conf, Table table, List<String> partition)
+      throws Exception {
+    this.metaStoreClient = metaStoreClient;
+    this.table = table;
+    this.partition = partition;
+
+    txns = metaStoreClient.getValidTxns();
+    partitionLocation = getPartitionLocation();
+    dir = AcidUtils.getAcidState(partitionLocation, conf, txns);
+    assertEquals(0, dir.getObsolete().size());
+    assertEquals(0, dir.getOriginalFiles().size());
+
+    currentDeltas = dir.getCurrentDirectories();
+    min = Long.MAX_VALUE;
+    max = Long.MIN_VALUE;
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) {
+      System.out.println(parsedDelta.getPath().toString());
+      max = Math.max(parsedDelta.getMaxTransaction(), max);
+      min = Math.min(parsedDelta.getMinTransaction(), min);
+    }
+  }
+
+  public void assertExpectedFileCount(int expectedFileCount) {
+    assertEquals(expectedFileCount, currentDeltas.size());
+  }
+
+  public void assertNothingWritten() {
+    assertExpectedFileCount(0);
+  }
+
+  public void assertMinTransactionId(long expectedMinTransactionId) {
+    if (currentDeltas.isEmpty()) {
+      throw new AssertionError("No data");
+    }
+    assertEquals(expectedMinTransactionId, min);
+  }
+
+  public void assertMaxTransactionId(long expectedMaxTransactionId) {
+    if (currentDeltas.isEmpty()) {
+      throw new AssertionError("No data");
+    }
+    assertEquals(expectedMaxTransactionId, max);
+  }
+
+  List<Record> readRecords() throws Exception {
+    if (currentDeltas.isEmpty()) {
+      throw new AssertionError("No data");
+    }
+    InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", partitionLocation.toString());
+    job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    InputSplit[] splits = inputFormat.getSplits(job, 1);
+    assertEquals(1, splits.length);
+
+    final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
+        .getRecordReader(splits[0], job, Reporter.NULL);
+
+    NullWritable key = recordReader.createKey();
+    OrcStruct value = recordReader.createValue();
+
+    List<Record> records = new ArrayList<>();
+    while (recordReader.next(key, value)) {
+      RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+      Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+          recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString());
+      System.out.println(record);
+      records.add(record);
+    }
+    recordReader.close();
+    return records;
+  }
+
+  private Path getPartitionLocation() throws NoSuchObjectException, MetaException, TException {
+    Path partitionLocacation;
+    if (partition.isEmpty()) {
+      partitionLocacation = new Path(table.getSd().getLocation());
+    } else {
+      // TODO: calculate this instead. Just because we're writing to the location doesn't mean that it'll
+      // always be wanted in the meta store right away.
+      List<Partition> partitionEntries = metaStoreClient.listPartitions(table.getDbName(), table.getTableName(),
+          partition, (short) 1);
+      partitionLocacation = new Path(partitionEntries.get(0).getSd().getLocation());
+    }
+    return partitionLocacation;
+  }
+
+  public static class Record {
+    private RecordIdentifier recordIdentifier;
+    private String row;
+
+    Record(RecordIdentifier recordIdentifier, String row) {
+      this.recordIdentifier = recordIdentifier;
+      this.row = row;
+    }
+
+    public RecordIdentifier getRecordIdentifier() {
+      return recordIdentifier;
+    }
+
+    public String getRow() {
+      return row;
+    }
+
+    @Override
+    public String toString() {
+      return "Record [recordIdentifier=" + recordIdentifier + ", row=" + row + "]";
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
new file mode 100644
index 0000000..f8c8537
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java
@@ -0,0 +1,261 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.thrift.TException;
+
+public class StreamingTestUtils {
+
+  public HiveConf newHiveConf(String metaStoreUri) {
+    HiveConf conf = new HiveConf(this.getClass());
+    conf.set("fs.raw.impl", RawFileSystem.class.getName());
+    if (metaStoreUri != null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    return conf;
+  }
+
+  public void prepareTransactionDatabase(HiveConf conf) throws Exception {
+    TxnDbUtil.setConfValues(conf);
+    TxnDbUtil.cleanDb();
+    TxnDbUtil.prepDb();
+  }
+
+  public IMetaStoreClient newMetaStoreClient(HiveConf conf) throws Exception {
+    return new HiveMetaStoreClient(conf);
+  }
+
+  public static class RawFileSystem extends RawLocalFileSystem {
+    private static final URI NAME;
+    static {
+      try {
+        NAME = new URI("raw:///");
+      } catch (URISyntaxException se) {
+        throw new IllegalArgumentException("bad uri", se);
+      }
+    }
+
+    @Override
+    public URI getUri() {
+      return NAME;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      File file = pathToFile(path);
+      if (!file.exists()) {
+        throw new FileNotFoundException("Can't find " + path);
+      }
+      // get close enough
+      short mod = 0;
+      if (file.canRead()) {
+        mod |= 0444;
+      }
+      if (file.canWrite()) {
+        mod |= 0200;
+      }
+      if (file.canExecute()) {
+        mod |= 0111;
+      }
+      return new FileStatus(file.length(), file.isDirectory(), 1, 1024, file.lastModified(), file.lastModified(),
+          FsPermission.createImmutable(mod), "owen", "users", path);
+    }
+  }
+
+  public static DatabaseBuilder databaseBuilder(File warehouseFolder) {
+    return new DatabaseBuilder(warehouseFolder);
+  }
+
+  public static class DatabaseBuilder {
+
+    private Database database;
+    private File warehouseFolder;
+
+    public DatabaseBuilder(File warehouseFolder) {
+      this.warehouseFolder = warehouseFolder;
+      database = new Database();
+    }
+
+    public DatabaseBuilder name(String name) {
+      database.setName(name);
+      File databaseFolder = new File(warehouseFolder, name + ".db");
+      String databaseLocation = "raw://" + databaseFolder.toURI().getPath();
+      database.setLocationUri(databaseLocation);
+      return this;
+    }
+
+    public Database dropAndCreate(IMetaStoreClient metaStoreClient) throws Exception {
+      if (metaStoreClient == null) {
+        throw new IllegalArgumentException();
+      }
+      try {
+        for (String table : metaStoreClient.listTableNamesByFilter(database.getName(), "", (short) -1)) {
+          metaStoreClient.dropTable(database.getName(), table, true, true);
+        }
+        metaStoreClient.dropDatabase(database.getName());
+      } catch (TException e) {
+      }
+      metaStoreClient.createDatabase(database);
+      return database;
+    }
+
+    public Database build() {
+      return database;
+    }
+
+  }
+
+  public static TableBuilder tableBuilder(Database database) {
+    return new TableBuilder(database);
+  }
+
+  public static class TableBuilder {
+
+    private Table table;
+    private StorageDescriptor sd;
+    private SerDeInfo serDeInfo;
+    private Database database;
+    private List<List<String>> partitions;
+    private List<String> columnNames;
+    private List<String> columnTypes;
+    private List<String> partitionKeys;
+
+    public TableBuilder(Database database) {
+      this.database = database;
+      partitions = new ArrayList<>();
+      columnNames = new ArrayList<>();
+      columnTypes = new ArrayList<>();
+      partitionKeys = Collections.emptyList();
+      table = new Table();
+      table.setDbName(database.getName());
+      table.setTableType(TableType.MANAGED_TABLE.toString());
+      Map<String, String> tableParams = new HashMap<String, String>();
+      tableParams.put("transactional", Boolean.TRUE.toString());
+      table.setParameters(tableParams);
+
+      sd = new StorageDescriptor();
+      sd.setInputFormat(HiveInputFormat.class.getName());
+      sd.setOutputFormat(OrcOutputFormat.class.getName());
+      sd.setNumBuckets(1);
+      table.setSd(sd);
+
+      serDeInfo = new SerDeInfo();
+      serDeInfo.setParameters(new HashMap<String, String>());
+      serDeInfo.getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+      serDeInfo.setSerializationLib(OrcSerde.class.getName());
+      sd.setSerdeInfo(serDeInfo);
+    }
+
+    public TableBuilder name(String name) {
+      sd.setLocation(database.getLocationUri() + Path.SEPARATOR + name);
+      table.setTableName(name);
+      serDeInfo.setName(name);
+      return this;
+    }
+
+    public TableBuilder buckets(int buckets) {
+      sd.setNumBuckets(buckets);
+      return this;
+    }
+
+    public TableBuilder addColumn(String columnName, String columnType) {
+      columnNames.add(columnName);
+      columnTypes.add(columnType);
+      return this;
+    }
+
+    public TableBuilder partitionKeys(String... partitionKeys) {
+      this.partitionKeys = Arrays.asList(partitionKeys);
+      return this;
+    }
+
+    public TableBuilder addPartition(String... partitionValues) {
+      partitions.add(Arrays.asList(partitionValues));
+      return this;
+    }
+
+    public TableBuilder addPartition(List<String> partitionValues) {
+      partitions.add(partitionValues);
+      return this;
+    }
+
+    public Table create(IMetaStoreClient metaStoreClient) throws Exception {
+      if (metaStoreClient == null) {
+        throw new IllegalArgumentException();
+      }
+      return internalCreate(metaStoreClient);
+    }
+
+    public Table build() throws Exception {
+      return internalCreate(null);
+    }
+
+    private Table internalCreate(IMetaStoreClient metaStoreClient) throws Exception {
+      List<FieldSchema> fields = new ArrayList<FieldSchema>(columnNames.size());
+      for (int i = 0; i < columnNames.size(); i++) {
+        fields.add(new FieldSchema(columnNames.get(i), columnTypes.get(i), ""));
+      }
+      sd.setCols(fields);
+
+      if (!partitionKeys.isEmpty()) {
+        List<FieldSchema> partitionFields = new ArrayList<FieldSchema>();
+        for (String partitionKey : partitionKeys) {
+          partitionFields.add(new FieldSchema(partitionKey, serdeConstants.STRING_TYPE_NAME, ""));
+        }
+        table.setPartitionKeys(partitionFields);
+      }
+      if (metaStoreClient != null) {
+        metaStoreClient.createTable(table);
+      }
+
+      for (List<String> partitionValues : partitions) {
+        Partition partition = new Partition();
+        partition.setDbName(database.getName());
+        partition.setTableName(table.getTableName());
+        StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+        partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+            + Warehouse.makePartName(table.getPartitionKeys(), partitionValues));
+        partition.setSd(partitionSd);
+        partition.setValues(partitionValues);
+
+        if (metaStoreClient != null) {
+          metaStoreClient.add_partition(partition);
+        }
+      }
+      return table;
+    }
+  }
+
+}