You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2018/07/23 21:34:13 UTC

[2/2] crunch git commit: CRUNCH-619: Update to HBase 2.0.1. Contributed by Attila Sasvari.

CRUNCH-619: Update to HBase 2.0.1. Contributed by Attila Sasvari.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1b2c058c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1b2c058c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1b2c058c

Branch: refs/heads/master
Commit: 1b2c058c4c2b084d895cf6ae86f68ac935282f2c
Parents: ffca004
Author: Josh Wills <jw...@apache.org>
Authored: Mon Jul 23 13:31:00 2018 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Jul 23 13:31:00 2018 -0700

----------------------------------------------------------------------
 crunch-examples/pom.xml                         |   7 +-
 .../crunch/examples/WordAggregationHBase.java   |  33 ++---
 crunch-hbase/pom.xml                            |   6 +
 .../apache/crunch/io/hbase/HFileSourceIT.java   |  29 ++--
 .../apache/crunch/io/hbase/HFileTargetIT.java   |  99 ++++++++------
 .../crunch/io/hbase/WordCountHBaseIT.java       |  41 +++---
 .../org/apache/crunch/io/hbase/HBaseData.java   |   1 -
 .../crunch/io/hbase/HBaseSourceTarget.java      |   6 +-
 .../org/apache/crunch/io/hbase/HBaseTypes.java  |   3 +-
 .../crunch/io/hbase/HFileInputFormat.java       |  15 ++-
 .../io/hbase/HFileOutputFormatForCrunch.java    | 132 +++++++------------
 .../crunch/io/hbase/HFileReaderFactory.java     |   7 +-
 .../org/apache/crunch/io/hbase/HFileTarget.java |  18 +--
 .../org/apache/crunch/io/hbase/HFileUtils.java  |  84 ++++++------
 .../apache/crunch/io/hbase/HTableIterable.java  |   1 -
 .../apache/crunch/io/hbase/HTableIterator.java  |   4 +-
 crunch-hcatalog/pom.xml                         |  53 +++++++-
 crunch-spark/pom.xml                            |   9 +-
 .../org/apache/crunch/SparkHFileTargetIT.java   |  92 ++++++++-----
 .../apache/crunch/SparkWordCountHBaseIT.java    |  26 ++--
 pom.xml                                         |  11 +-
 21 files changed, 378 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-examples/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-examples/pom.xml b/crunch-examples/pom.xml
index e28d553..adc6e9a 100644
--- a/crunch-examples/pom.xml
+++ b/crunch-examples/pom.xml
@@ -50,7 +50,6 @@ under the License.
       <scope>provided</scope>
     </dependency>
 
-
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
@@ -66,6 +65,12 @@ under the License.
 
     <dependency>
       <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-mapreduce</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
index 5d62d19..b128b7f 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
@@ -40,13 +40,14 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.Tool;
@@ -147,11 +148,13 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
    * @throws IOException
    */
   private static void putInHbase(List<Put> putList, Configuration conf) throws IOException {
-    HTable htable = new HTable(conf, TABLE_SOURCE);
+    Connection connection = ConnectionFactory.createConnection(conf);
+    Table htable = connection.getTable(TableName.valueOf(TABLE_SOURCE));
     try {
       htable.put(putList);
     } finally {
       htable.close();
+      connection.close();
     }
   }
 
@@ -161,16 +164,15 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
    * @param conf the hbase configuration
    * @param htableName the table name
    * @param families the column family names
-   * @throws MasterNotRunningException
-   * @throws ZooKeeperConnectionException
    * @throws IOException
    */
-  private static void createTable(Configuration conf, String htableName, String... families) throws MasterNotRunningException, ZooKeeperConnectionException,
-      IOException {
-    HBaseAdmin hbase = new HBaseAdmin(conf);
+  private static void createTable(Configuration conf, String htableName, String... families) throws IOException {
+    Connection connection = ConnectionFactory.createConnection(conf);
+    Admin hbase = connection.getAdmin();
     try {
-      if (!hbase.tableExists(htableName)) {
-        HTableDescriptor desc = new HTableDescriptor(htableName);
+      TableName tableName = TableName.valueOf(htableName);
+      if (!hbase.tableExists(tableName)) {
+        HTableDescriptor desc = new HTableDescriptor(tableName);
         for (String s : families) {
           HColumnDescriptor meta = new HColumnDescriptor(s);
           desc.addFamily(meta);
@@ -179,6 +181,7 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
       }
     } finally {
       hbase.close();
+      connection.close();
     }
   }
 
@@ -197,8 +200,8 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
     }
     for (int i = 0; i < character.size(); i++) {
       Put put = new Put(Bytes.toBytes(character.get(i)));
-      put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY, Bytes.toBytes(play.get(i)));
-      put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE, Bytes.toBytes(quote.get(i)));
+      put.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY, Bytes.toBytes(play.get(i)));
+      put.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE, Bytes.toBytes(quote.get(i)));
       list.add(put);
     }
     return list;
@@ -238,7 +241,7 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
       @Override
       public void process(Pair<String, String> input, Emitter<Put> emitter) {
         Put put = new Put(Bytes.toBytes(input.first()));
-        put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second()));
+        put.addColumn(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second()));
         emitter.emit(put);
       }
     }, HBaseTypes.puts());

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index 53b1199..075b197 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -88,6 +88,12 @@ under the License.
 
     <dependency>
       <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-mapreduce</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-testing-util</artifactId>
       <scope>provided</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
index 6f418a5..d485872 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
@@ -35,6 +35,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -126,9 +129,9 @@ public class HFileSourceIT implements Serializable {
     assertEquals(1, results.size());
     Result result = Iterables.getOnlyElement(results);
     assertArrayEquals(ROW1, result.getRow());
-    assertEquals(2, result.raw().length);
-    assertArrayEquals(VALUE1, result.getColumnLatest(FAMILY1, QUALIFIER1).getValue());
-    assertArrayEquals(VALUE2, result.getColumnLatest(FAMILY1, QUALIFIER2).getValue());
+    assertEquals(2, result.rawCells().length);
+    assertArrayEquals(VALUE1, CellUtil.cloneValue(result.getColumnLatestCell(FAMILY1, QUALIFIER1)));
+    assertArrayEquals(VALUE2, CellUtil.cloneValue(result.getColumnLatestCell(FAMILY1, QUALIFIER2)));
   }
 
   @Test
@@ -142,11 +145,11 @@ public class HFileSourceIT implements Serializable {
     List<Result> results = doTestScanHFiles(kvs, scan);
     assertEquals(1, results.size());
     Result result = Iterables.getOnlyElement(results);
-    List<KeyValue> kvs2 = result.getColumn(FAMILY1, QUALIFIER1);
+    List<Cell> kvs2 = result.getColumnCells(FAMILY1, QUALIFIER1);
     assertEquals(3, kvs2.size());
-    assertArrayEquals(VALUE3, kvs2.get(0).getValue());
-    assertArrayEquals(VALUE2, kvs2.get(1).getValue());
-    assertArrayEquals(VALUE1, kvs2.get(2).getValue());
+    assertArrayEquals(VALUE3, CellUtil.cloneValue(kvs2.get(0)));
+    assertArrayEquals(VALUE2, CellUtil.cloneValue(kvs2.get(1)));
+    assertArrayEquals(VALUE1, CellUtil.cloneValue(kvs2.get(2)));
   }
 
   @Test
@@ -173,8 +176,8 @@ public class HFileSourceIT implements Serializable {
     scan.setStartRow(ROW1);
     List<Result> results = doTestScanHFiles(kvs, scan);
     assertEquals(2, results.size());
-    assertArrayEquals(ROW2, kvs.get(0).getRow());
-    assertArrayEquals(ROW3, kvs.get(1).getRow());
+    assertArrayEquals(ROW2, results.get(0).getRow());
+    assertArrayEquals(ROW3, results.get(1).getRow());
   }
 
   //@Test
@@ -214,8 +217,8 @@ public class HFileSourceIT implements Serializable {
     assertEquals(1, results.size());
     Result result = Iterables.getOnlyElement(results);
     assertEquals(2, result.size());
-    assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER1));
-    assertNotNull(result.getColumnLatest(FAMILY2, QUALIFIER2));
+    assertNotNull(result.getColumnLatestCell(FAMILY1, QUALIFIER1));
+    assertNotNull(result.getColumnLatestCell(FAMILY2, QUALIFIER2));
   }
 
   @Test
@@ -230,7 +233,7 @@ public class HFileSourceIT implements Serializable {
     assertEquals(1, results.size());
     Result result = Iterables.getOnlyElement(results);
     assertEquals(1, result.size());
-    assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2));
+    assertNotNull(result.getColumnLatestCell(FAMILY1, QUALIFIER2));
   }
 
   @Test
@@ -326,7 +329,7 @@ public class HFileSourceIT implements Serializable {
       FileSystem fs = FileSystem.get(conf);
       w = HFile.getWriterFactory(conf, new CacheConfig(conf))
           .withPath(fs, inputPath)
-          .withComparator(KeyValue.COMPARATOR)
+          .withComparator(CellComparatorImpl.COMPARATOR)
           .withFileContext(new HFileContext())
           .create();
       for (KeyValue kv : sortedKVs) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 9027c1b..ffe2177 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -19,7 +19,6 @@ package org.apache.crunch.io.hbase;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import org.apache.commons.io.IOUtils;
@@ -50,6 +49,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -57,11 +57,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -69,11 +72,11 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
-import org.apache.hadoop.hbase.util.ByteBloomFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -97,13 +100,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.crunch.types.writable.Writables.nulls;
 import static org.apache.crunch.types.writable.Writables.tableOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -152,21 +155,19 @@ public class HFileTargetIT implements Serializable {
     HBASE_TEST_UTILITY.startMiniCluster(1);
   }
 
-  private static HTable createTable(int splits) throws Exception {
+  private static Table createTable(int splits) throws Exception {
     HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
     return createTable(splits, hcol);
   }
 
-  private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception {
-    byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000));
-    HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
+  private static Table createTable(int splits, HColumnDescriptor... hcols) throws Exception {
+    TableName tableName = TableName.valueOf(Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000)));
     HTableDescriptor htable = new HTableDescriptor(tableName);
     for (HColumnDescriptor hcol : hcols) {
       htable.addFamily(hcol);
     }
-    admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
-    HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000);
-    return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName);
+    return HBASE_TEST_UTILITY.createTable(htable,
+        Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
   }
 
   @AfterClass
@@ -196,7 +197,7 @@ public class HFileTargetIT implements Serializable {
 
     FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration());
     KeyValue kv = readFromHFiles(fs, outputPath, "and");
-    assertEquals(375L, Bytes.toLong(kv.getValue()));
+    assertEquals(375L, Bytes.toLong(CellUtil.cloneValue(kv)));
   }
 
   @Test
@@ -206,21 +207,25 @@ public class HFileTargetIT implements Serializable {
     Path outputPath = getTempPathOnHDFS("out");
     byte[] columnFamilyA = Bytes.toBytes("colfamA");
     byte[] columnFamilyB = Bytes.toBytes("colfamB");
-    HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB));
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
+    Table testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB));
+    Connection connection = admin.getConnection();
+    RegionLocator regionLocator = connection.getRegionLocator(testTable.getName());
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
     PTable<String,Long> wordCounts = words.count();
     PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
         wordCountPuts,
-        testTable,
+        connection,
+        testTable.getName(),
         outputPath);
 
     PipelineResult result = pipeline.run();
     assertTrue(result.succeeded());
 
     new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration())
-        .doBulkLoad(outputPath, testTable);
+        .doBulkLoad(outputPath, admin, testTable, regionLocator);
 
     Map<String, Long> EXPECTED = ImmutableMap.<String, Long>builder()
         .put("__EMPTY__", 1345L)
@@ -243,8 +248,12 @@ public class HFileTargetIT implements Serializable {
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath1 = getTempPathOnHDFS("out1");
     Path outputPath2 = getTempPathOnHDFS("out2");
-    HTable table1 = createTable(26);
-    HTable table2 = createTable(26);
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
+    Connection connection = admin.getConnection();
+    Table table1 = createTable(26);
+    Table table2 = createTable(26);
+    RegionLocator regionLocator1 = connection.getRegionLocator(table1.getName());
+    RegionLocator regionLocator2 = connection.getRegionLocator(table2.getName());
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration());
     boolean onlyAffectedRegions = true;
 
@@ -256,19 +265,21 @@ public class HFileTargetIT implements Serializable {
     PTable<String, Long> longWordCounts = longWords.count();
     HFileUtils.writePutsToHFilesForIncrementalLoad(
         convertToPuts(shortWordCounts),
-        table1,
+        connection,
+        table1.getName(),
         outputPath1);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
         convertToPuts(longWordCounts),
-        table2,
+        connection,
+        table2.getName(),
         outputPath2,
         onlyAffectedRegions);
 
     PipelineResult result = pipeline.run();
     assertTrue(result.succeeded());
 
-    loader.doBulkLoad(outputPath1, table1);
-    loader.doBulkLoad(outputPath2, table2);
+    loader.doBulkLoad(outputPath1, admin, table1, regionLocator1);
+    loader.doBulkLoad(outputPath2, admin, table2, regionLocator2);
 
     assertEquals(314L, getWordCountFromTable(table1, "of"));
     assertEquals(375L, getWordCountFromTable(table2, "and"));
@@ -282,10 +293,12 @@ public class HFileTargetIT implements Serializable {
     Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath = getTempPathOnHDFS("out");
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
+    Connection connection = admin.getConnection();
     HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
     hcol.setDataBlockEncoding(newBlockEncoding);
     hcol.setBloomFilterType(BloomType.ROWCOL);
-    HTable testTable = createTable(26, hcol);
+    Table testTable = createTable(26, hcol);
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
@@ -293,7 +306,8 @@ public class HFileTargetIT implements Serializable {
     PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
         wordCountPuts,
-        testTable,
+        connection,
+        testTable.getName(),
         outputPath);
 
     PipelineResult result = pipeline.run();
@@ -309,11 +323,11 @@ public class HFileTargetIT implements Serializable {
       }
       HFile.Reader reader = null;
       try {
-        reader = HFile.createReader(fs, f, new CacheConfig(conf), conf);
+        reader = HFile.createReader(fs, f, new CacheConfig(conf), true, conf);
         assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding());
 
         BloomType bloomFilterType = BloomType.valueOf(Bytes.toString(
-            reader.loadFileInfo().get(StoreFile.BLOOM_FILTER_TYPE_KEY)));
+            reader.loadFileInfo().get(HStoreFile.BLOOM_FILTER_TYPE_KEY)));
         assertEquals(BloomType.ROWCOL, bloomFilterType);
         DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
         assertNotNull(bloomMeta);
@@ -337,7 +351,10 @@ public class HFileTargetIT implements Serializable {
     Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath1 = getTempPathOnHDFS("out1");
-    HTable table1 = createTable(26);
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
+    Connection connection = admin.getConnection();
+    Table table1 = createTable(26);
+    RegionLocator regionLocator1 = connection.getRegionLocator(table1.getName());
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
@@ -348,7 +365,8 @@ public class HFileTargetIT implements Serializable {
     PCollection<Put> wordPuts = convertToPuts(count);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
             wordPuts,
-            table1,
+            connection,
+            table1.getName(),
             outputPath1,
             onlyAffectedRegions);
 
@@ -393,7 +411,7 @@ public class HFileTargetIT implements Serializable {
       writtenPartitions.add((BytesWritable) wdc.deepCopy(next));
     }
 
-    ImmutableList<byte[]> startKeys = ImmutableList.copyOf(table1.getStartKeys());
+    ImmutableList<byte[]> startKeys = ImmutableList.copyOf(regionLocator1.getStartKeys());
     // assert that only affected regions were loaded into
     assertTrue(startKeys.size() > writtenPartitions.size());
 
@@ -462,7 +480,7 @@ public class HFileTargetIT implements Serializable {
         long c = input.second();
         Put p = new Put(Bytes.toBytes(w));
         for (byte[] columnFamily : columnFamilies) {
-          p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c));
+          p.addColumn(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c));
         }
         return p;
       }
@@ -479,7 +497,7 @@ public class HFileTargetIT implements Serializable {
         }
         long c = input.second();
         Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c));
-        return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null);
+        return Pair.of(KeyValueUtil.copyToNewKeyValue(cell), null);
       }
     }, tableOf(HBaseTypes.keyValues(), nulls()))
         .groupByKey(GroupingOptions.builder()
@@ -503,28 +521,31 @@ public class HFileTargetIT implements Serializable {
   /** Reads the first value on a given row from a bunch of hfiles. */
   private static KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException {
     List<KeyValueScanner> scanners = Lists.newArrayList();
-    KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row));
+    KeyValue fakeKV = KeyValueUtil.createFirstOnRow(Bytes.toBytes(row));
     for (FileStatus e : fs.listStatus(mrOutputPath)) {
       Path f = e.getPath();
       if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
         continue;
       }
-      StoreFile.Reader reader = new StoreFile.Reader(
+      StoreFileReader reader = new StoreFileReader(
           fs,
           f,
           new CacheConfig(fs.getConf()),
+          true,
+          new AtomicInteger(),
+          false,
           fs.getConf());
-      StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+      StoreFileScanner scanner = reader.getStoreFileScanner(false, false, false, 0, 0, false);
       scanner.seek(fakeKV); // have to call seek of each underlying scanner, otherwise KeyValueHeap won't work
       scanners.add(scanner);
     }
     assertTrue(!scanners.isEmpty());
-    KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
+    KeyValueScanner kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
     boolean seekOk = kvh.seek(fakeKV);
     assertTrue(seekOk);
     Cell kv = kvh.next();
     kvh.close();
-    return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of());
+    return KeyValueUtil.copyToNewKeyValue(kv);
   }
 
   private static Path copyResourceFileToHDFS(String resourceName) throws IOException {
@@ -551,11 +572,11 @@ public class HFileTargetIT implements Serializable {
     return result.makeQualified(fs);
   }
 
-  private static long getWordCountFromTable(HTable table, String word) throws IOException {
+  private static long getWordCountFromTable(Table table, String word) throws IOException {
     return getWordCountFromTable(table, TEST_FAMILY, word);
   }
 
-  private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word) throws IOException {
+  private static long getWordCountFromTable(Table table, byte[] columnFamily, String word) throws IOException {
     Get get = new Get(Bytes.toBytes(word));
     get.addFamily(columnFamily);
     byte[] value = table.get(get).value();

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 4a06c0f..3de3a80 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -42,13 +42,14 @@ import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase;
@@ -101,7 +102,7 @@ public class WordCountHBaseIT {
       @Override
       public void process(Pair<String, Long> input, Emitter<Put> emitter) {
         Put put = new Put(Bytes.toBytes(input.first()));
-        put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second()));
+        put.addColumn(COUNTS_COLFAM, null, Bytes.toBytes(input.second()));
         emitter.emit(put);
       }
 
@@ -123,9 +124,9 @@ public class WordCountHBaseIT {
   @Before
   public void setUp() throws Exception {
     Configuration conf = HBaseConfiguration.create(tmpDir.getDefaultConfiguration());
+    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, tmpDir.getFile("hbase-staging").getAbsolutePath());
     hbaseTestUtil = new HBaseTestingUtility(conf);
-    hbaseTestUtil.startMiniZKCluster();
-    hbaseTestUtil.startMiniHBaseCluster(1, 1);
+    hbaseTestUtil.startMiniCluster();
   }
 
   @Test
@@ -141,8 +142,7 @@ public class WordCountHBaseIT {
 
   @After
   public void tearDown() throws Exception {
-    hbaseTestUtil.shutdownMiniHBaseCluster();
-    hbaseTestUtil.shutdownMiniZKCluster();
+    hbaseTestUtil.shutdownMiniCluster();
   }
 
   public void run(Pipeline pipeline) throws Exception {
@@ -153,20 +153,20 @@ public class WordCountHBaseIT {
 
     Random rand = new Random();
     int postFix = rand.nextInt() & 0x7FFFFFFF;
-    String inputTableName = "crunch_words_" + postFix;
-    String outputTableName = "crunch_counts_" + postFix;
-    String otherTableName = "crunch_other_" + postFix;
-    String joinTableName = "crunch_join_words_" + postFix;
+    TableName inputTableName = TableName.valueOf("crunch_words_" + postFix);
+    TableName outputTableName = TableName.valueOf("crunch_counts_" + postFix);
+    TableName otherTableName = TableName.valueOf("crunch_other_" + postFix);
+    TableName joinTableName = TableName.valueOf("crunch_join_words_" + postFix);
 
-    HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
-    HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
-    HTable otherTable = hbaseTestUtil.createTable(Bytes.toBytes(otherTableName), COUNTS_COLFAM);
+    Table inputTable = hbaseTestUtil.createTable(inputTableName, WORD_COLFAM);
+    Table outputTable = hbaseTestUtil.createTable(outputTableName, COUNTS_COLFAM);
+    Table otherTable = hbaseTestUtil.createTable(otherTableName, COUNTS_COLFAM);
 
     int key = 0;
     key = put(inputTable, key, "cat");
     key = put(inputTable, key, "cat");
     key = put(inputTable, key, "dog");
-    inputTable.flushCommits();
+    inputTable.close();
 
     //Setup scan using multiple scans that simply cut the rows in half.
     Scan scan = new Scan();
@@ -179,7 +179,7 @@ public class WordCountHBaseIT {
 
     HBaseSourceTarget source = null;
     if(clazz == null){
-      source = new HBaseSourceTarget(TableName.valueOf(inputTableName), scan, scan2);
+      source = new HBaseSourceTarget(inputTableName, scan, scan2);
     }else{
       source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2});
     }
@@ -200,14 +200,13 @@ public class WordCountHBaseIT {
     assertIsLong(outputTable, "dog", 1);
 
     // verify we can do joins.
-    HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);
+    Table joinTable = hbaseTestUtil.createTable(joinTableName, WORD_COLFAM);
     try {
       key = 0;
       key = put(joinTable, key, "zebra");
       key = put(joinTable, key, "donkey");
       key = put(joinTable, key, "bird");
       key = put(joinTable, key, "horse");
-      joinTable.flushCommits();
     } finally {
       joinTable.close();
     }
@@ -233,14 +232,14 @@ public class WordCountHBaseIT {
     assertDeleted(outputTable, "dog");
   }
 
-  protected int put(HTable table, int key, String value) throws IOException {
+  protected int put(Table table, int key, String value) throws IOException {
     Put put = new Put(Bytes.toBytes(key));
-    put.add(WORD_COLFAM, null, Bytes.toBytes(value));
+    put.addColumn(WORD_COLFAM, null, Bytes.toBytes(value));
     table.put(put);
     return key + 1;
   }
 
-  protected static void assertIsLong(HTable table, String key, long i) throws IOException {
+  protected static void assertIsLong(Table table, String key, long i) throws IOException {
     Get get = new Get(Bytes.toBytes(key));
     get.addFamily(COUNTS_COLFAM);
     Result result = table.get(get);
@@ -250,7 +249,7 @@ public class WordCountHBaseIT {
     assertEquals(i, Bytes.toLong(rawCount));
   }
   
-  protected static void assertDeleted(HTable table, String key) throws IOException {
+  protected static void assertDeleted(Table table, String key) throws IOException {
     Get get = new Get(Bytes.toBytes(key));
     get.addFamily(COUNTS_COLFAM);
     Result result = table.get(get);

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 4ac6c8e..880ab68 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index ede7603..a8c157d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.hbase;
 
 import java.io.IOException;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ReadableData;
@@ -49,7 +50,6 @@ import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
@@ -176,11 +176,11 @@ public class HBaseSourceTarget extends HBaseTarget implements
 
   static String convertScanToString(Scan scan) throws IOException {
     ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
-    return Base64.encodeBytes(proto.toByteArray());
+    return Base64.encodeBase64String(proto.toByteArray());
   }
 
   public static Scan convertStringToScan(String string) throws IOException {
-    ClientProtos.Scan proto = ClientProtos.Scan.parseFrom(Base64.decode(string));
+    ClientProtos.Scan proto = ClientProtos.Scan.parseFrom(Base64.decodeBase64(string));
     return ProtobufUtil.toScan(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
index 787b9c6..76a06c2 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java
@@ -26,6 +26,7 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
@@ -103,7 +104,7 @@ public final class HBaseTypes {
   }
 
   public static BytesWritable keyValueToBytes(Cell input) {
-    return keyValueToBytes(KeyValue.cloneAndAddTags(input, ImmutableList.<Tag>of()));
+    return keyValueToBytes(KeyValueUtil.copyToNewKeyValue(input));
   }
 
   public static BytesWritable keyValueToBytes(KeyValue kv) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
index b286f51..595e86d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -93,7 +96,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> {
       Path path = fileSplit.getPath();
       FileSystem fs = path.getFileSystem(conf);
       LOG.info("Initialize HFileRecordReader for {}", path);
-      this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf);
+      this.in = HFile.createReader(fs, path, new CacheConfig(conf), true, conf);
 
       // The file info must be loaded before the scanner can be used.
       // This seems like a bug in HBase, but it's easily worked around.
@@ -129,8 +132,8 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> {
           if(LOG.isInfoEnabled()) {
             LOG.info("Seeking to start row {}", Bytes.toStringBinary(startRow));
           }
-          KeyValue kv = KeyValue.createFirstOnRow(startRow);
-          hasNext = seekAtOrAfter(scanner, kv);
+          Cell cell = PrivateCellUtil.createFirstOnRow(startRow, 0, (short) startRow.length);
+          hasNext = seekAtOrAfter(scanner, cell);
         } else {
           LOG.info("Seeking to start");
           hasNext = scanner.seekTo();
@@ -142,7 +145,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> {
       if (!hasNext) {
         return false;
       }
-      value = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of());
+      value = KeyValueUtil.copyToNewKeyValue(scanner.getCell());
       if (stopRow != null &&
           Bytes.compareTo(
               value.getRowArray(), value.getRowOffset(), value.getRowLength(),
@@ -185,7 +188,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> {
 
     // This method is copied from o.a.h.hbase.regionserver.StoreFileScanner, as we don't want
     // to depend on it.
-    private static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
+    private static boolean seekAtOrAfter(HFileScanner s, Cell k)
         throws IOException {
       int result = s.seekTo(k);
       if(result < 0) {
@@ -233,4 +236,4 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> {
     // This file isn't splittable.
     return false;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index 50d5a0b..3cb3ce5 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -19,28 +19,25 @@
  */
 package org.apache.crunch.io.hbase;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
 import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -49,10 +46,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 /**
  * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append}
@@ -66,115 +60,83 @@ import java.net.InetSocketAddress;
  */
 public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> {
 
+  // HCOLUMN_DESCRIPTOR_KEY is no longer used, but left for binary compatibility
   public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor";
+  public static final String HCOLUMN_DESCRIPTOR_COMPRESSION_TYPE_KEY = "hbase.hfileoutputformat.column.descriptor.compressiontype";
+  public static final String HCOLUMN_DESCRIPTOR_DATA_BLOCK_ENCODING_KEY = "hbase.hfileoutputformat.column.descriptor.datablockencoding";
+  public static final String HCOLUMN_DESCRIPTOR_BLOOM_FILTER_TYPE_KEY = "hbase.hfileoutputformat.column.descriptor.bloomfiltertype";
   private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude";
   private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormatForCrunch.class);
 
   private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
-  private final TimeRangeTracker trt = new TimeRangeTracker();
 
   @Override
   public RecordWriter<Object, Cell> getRecordWriter(final TaskAttemptContext context)
       throws IOException, InterruptedException {
     Path outputPath = getDefaultWorkFile(context, "");
-    final Configuration conf = context.getConfiguration();
-    FileSystem fs = new HFileSystem(outputPath.getFileSystem(conf));
+    Configuration conf = context.getConfiguration();
+    FileSystem fs = outputPath.getFileSystem(conf);
 
     final boolean compactionExclude = conf.getBoolean(
         COMPACTION_EXCLUDE_CONF_KEY, false);
 
-    String hcolStr = conf.get(HCOLUMN_DESCRIPTOR_KEY);
-    if (hcolStr == null) {
-      throw new AssertionError(HCOLUMN_DESCRIPTOR_KEY + " is not set in conf");
-    }
-    byte[] hcolBytes;
-    try {
-      hcolBytes = Hex.decodeHex(hcolStr.toCharArray());
-    } catch (DecoderException e) {
-      throw new AssertionError("Bad hex string: " + hcolStr);
-    }
-    HColumnDescriptor hcol = new HColumnDescriptor();
-    hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes)));
     LOG.info("Output path: {}", outputPath);
-    LOG.info("HColumnDescriptor: {}", hcol.toString());
     Configuration noCacheConf = new Configuration(conf);
     noCacheConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-    final StoreFile.WriterBuilder writerBuilder = new StoreFile.WriterBuilder(conf, new CacheConfig(noCacheConf), fs)
-        .withComparator(KeyValue.COMPARATOR)
-        .withFileContext(getContext(hcol))
+    StoreFileWriter.Builder writerBuilder =
+        new StoreFileWriter.Builder(conf, new CacheConfig(noCacheConf), fs)
+        .withComparator(CellComparatorImpl.COMPARATOR)
         .withFilePath(outputPath)
-        .withBloomType(hcol.getBloomFilterType());
+        .withFileContext(getContext(conf));
+    String bloomFilterType = conf.get(HCOLUMN_DESCRIPTOR_BLOOM_FILTER_TYPE_KEY);
+    if (bloomFilterType != null) {
+      writerBuilder.withBloomType(BloomType.valueOf(bloomFilterType));
+    }
+    final StoreFileWriter writer = writerBuilder.build();
 
     return new RecordWriter<Object, Cell>() {
 
-      StoreFile.Writer writer = null;
+      long maxSeqId = 0L;
 
       @Override
       public void write(Object row, Cell cell)
           throws IOException {
-
-        if (writer == null) {
-          writer = writerBuilder
-              .withFavoredNodes(getPreferredNodes(conf, cell))
-              .build();
-        }
-
-        KeyValue copy = KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of());
+        KeyValue copy = KeyValueUtil.copyToNewKeyValue(cell);
         if (copy.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
           copy.updateLatestStamp(now);
         }
         writer.append(copy);
-        trt.includeTimestamp(copy);
+        long seqId = cell.getSequenceId();
+        if (seqId > maxSeqId) {
+          maxSeqId = seqId;
+        }
       }
 
       @Override
       public void close(TaskAttemptContext c) throws IOException {
-        if (writer != null) {
-          writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-              Bytes.toBytes(System.currentTimeMillis()));
-          writer.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-              Bytes.toBytes(context.getTaskAttemptID().toString()));
-          writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
-              Bytes.toBytes(true));
-          writer.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-              Bytes.toBytes(compactionExclude));
-          writer.appendFileInfo(StoreFile.TIMERANGE_KEY,
-              WritableUtils.toByteArray(trt));
-          writer.close();
-        }
+        // true => product of major compaction
+        writer.appendMetadata(maxSeqId, true);
+        writer.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY,
+            Bytes.toBytes(System.currentTimeMillis()));
+        writer.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY,
+            Bytes.toBytes(context.getTaskAttemptID().toString()));
+        writer.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+            Bytes.toBytes(compactionExclude));
+        writer.close();
       }
     };
   }
 
-  /**
-   * Returns the "preferred" node for the given cell, or null if no preferred node can be found. The "preferred"
-   * node for a cell is defined as the host where the region server is located that is hosting the region that will
-   * contain the given cell.
-   */
-  private InetSocketAddress[] getPreferredNodes(Configuration conf, Cell cell) throws IOException {
-    String regionLocationFilePathStr = conf.get(RegionLocationTable.REGION_LOCATION_TABLE_PATH);
-    if (regionLocationFilePathStr != null) {
-      LOG.debug("Reading region location file from {}", regionLocationFilePathStr);
-      Path regionLocationPath = new Path(regionLocationFilePathStr);
-      try (FSDataInputStream inputStream = regionLocationPath.getFileSystem(conf).open(regionLocationPath)) {
-        RegionLocationTable regionLocationTable = RegionLocationTable.deserialize(inputStream);
-        InetSocketAddress preferredNodeForRow = regionLocationTable.getPreferredNodeForRow(CellUtil.cloneRow(cell));
-        if (preferredNodeForRow != null) {
-          return new InetSocketAddress[] { preferredNodeForRow };
-        } else {
-          return null;
-        }
-      }
-    } else {
-      LOG.warn("No region location file path found in configuration");
-      return null;
+  private HFileContext getContext(Configuration conf) {
+    HFileContextBuilder contextBuilder = new HFileContextBuilder();
+    String compressionType = conf.get(HCOLUMN_DESCRIPTOR_COMPRESSION_TYPE_KEY);
+    if (compressionType != null) {
+      contextBuilder.withCompression(HFileWriterImpl.compressionByName(compressionType));
     }
-  }
-
-  private HFileContext getContext(HColumnDescriptor desc) {
-    HFileContext ctxt = new HFileContext();
-    ctxt.setDataBlockEncoding(desc.getDataBlockEncoding());
-    ctxt.setCompression(desc.getCompression());
-    return ctxt;
+    String dataBlockEncoding = conf.get(HCOLUMN_DESCRIPTOR_DATA_BLOCK_ENCODING_KEY);
+    if (dataBlockEncoding != null) {
+      contextBuilder.withDataBlockEncoding(DataBlockEncoding.valueOf(dataBlockEncoding));
+    }
+    return contextBuilder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
index 14e6118..29af019 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -41,7 +42,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> {
     Configuration conf = fs.getConf();
     CacheConfig cacheConfig = new CacheConfig(conf);
     try {
-      HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig, conf);
+      HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig, true, conf);
       HFileScanner scanner = hfr.getScanner(
           conf.getBoolean(HFILE_SCANNER_CACHE_BLOCKS, false),
           conf.getBoolean(HFILE_SCANNER_PREAD, false));
@@ -59,7 +60,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> {
 
     public HFileIterator(HFileScanner scanner) {
       this.scanner = scanner;
-      this.curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of());
+      this.curr = KeyValueUtil.copyToNewKeyValue(scanner.getCell());
     }
 
     @Override
@@ -72,7 +73,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> {
       KeyValue ret = curr;
       try {
         if (scanner.next()) {
-          curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of());
+          curr = KeyValueUtil.copyToNewKeyValue(scanner.getCell());
         } else {
           curr = null;
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 8593a76..b1ce5ba 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,8 +17,6 @@
  */
 package org.apache.crunch.io.hbase;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.Converter;
@@ -31,26 +29,28 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Job;
 
 public class HFileTarget extends FileTargetImpl {
 
-  private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor();
-
   public HFileTarget(String path) {
     this(new Path(path));
   }
 
   public HFileTarget(Path path) {
-    this(path, DEFAULT_COLUMN_DESCRIPTOR);
+    this(path, null);
   }
 
   public HFileTarget(Path path, HColumnDescriptor hcol) {
     super(path, HFileOutputFormatForCrunch.class, SequentialFileNamingScheme.getInstance());
-    Preconditions.checkNotNull(hcol);
-    outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY,
-        Hex.encodeHexString(WritableUtils.toByteArray(hcol)));
+    if (hcol != null) {
+      outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_COMPRESSION_TYPE_KEY,
+          hcol.getCompressionType().getName());
+      outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_DATA_BLOCK_ENCODING_KEY,
+          hcol.getDataBlockEncoding().name());
+      outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_BLOOM_FILTER_TYPE_KEY,
+          hcol.getBloomFilterType().name());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 0db536b..d85481d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -59,12 +59,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
@@ -117,7 +119,7 @@ public final class HFileUtils {
     }
 
     private int compareType(KeyValue l, KeyValue r) {
-      return (int) r.getType() - (int) l.getType();
+      return (int) r.getTypeByte() - (int) l.getTypeByte();
     }
 
   };
@@ -272,9 +274,9 @@ public final class HFileUtils {
       Cell leftKey = new KeyValue(left, loffset + 8, llength - 8);
       Cell rightKey = new KeyValue(right, roffset + 8, rlength - 8);
 
-      byte[] lRow = leftKey.getRow();
-      byte[] rRow = rightKey.getRow();
-      int rowCmp = Bytes.compareTo(lRow, rRow);
+      int rowCmp = Bytes.compareTo(
+          leftKey.getRowArray(), leftKey.getRowOffset(), leftKey.getRowLength(),
+          rightKey.getRowArray(), rightKey.getRowOffset(), rightKey.getRowLength());
       if (rowCmp != 0) {
         return rowCmp;
       } else {
@@ -360,7 +362,7 @@ public final class HFileUtils {
             List<KeyValue> cells = Lists.newArrayList();
             for (Cell kv : input.second()) {
               try {
-                cells.add(KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of())); // assuming the input fits into memory
+                cells.add(KeyValueUtil.copyToNewKeyValue(kv)); // assuming the input fits in memory
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }
@@ -376,9 +378,10 @@ public final class HFileUtils {
 
   public static <C extends Cell> void writeToHFilesForIncrementalLoad(
           PCollection<C> cells,
-          HTable table,
+          Connection connection,
+          TableName tableName,
           Path outputPath) throws IOException {
-    writeToHFilesForIncrementalLoad(cells, table, outputPath, false);
+    writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false);
   }
 
   /**
@@ -392,18 +395,21 @@ public final class HFileUtils {
    */
   public static <C extends Cell> void writeToHFilesForIncrementalLoad(
       PCollection<C> cells,
-      HTable table,
+      Connection connection,
+      TableName tableName,
       Path outputPath,
       boolean limitToAffectedRegions) throws IOException {
+    Table table = connection.getTable(tableName);
+    RegionLocator regionLocator = connection.getRegionLocator(tableName);
     HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
     if (families.length == 0) {
       LOG.warn("{} has no column families", table);
       return;
     }
-    PCollection<C> partitioned = sortAndPartition(cells, table, limitToAffectedRegions);
+    PCollection<C> partitioned = sortAndPartition(cells, regionLocator, limitToAffectedRegions);
     RegionLocationTable regionLocationTable = RegionLocationTable.create(
         table.getName().getNameAsString(),
-        ((RegionLocator) table).getAllRegionLocations());
+        regionLocator.getAllRegionLocations());
     Path regionLocationFilePath = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(),
         "regionLocations" + table.getName().getNameAsString());
      writeRegionLocationTable(cells.getPipeline().getConfiguration(), regionLocationFilePath, regionLocationTable);
@@ -420,9 +426,10 @@ public final class HFileUtils {
 
   public static void writePutsToHFilesForIncrementalLoad(
           PCollection<Put> puts,
-          HTable table,
+          Connection connection,
+          TableName tableName,
           Path outputPath) throws IOException {
-    writePutsToHFilesForIncrementalLoad(puts, table, outputPath, false);
+    writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false);
   }
 
   /**
@@ -436,7 +443,8 @@ public final class HFileUtils {
    */
   public static void writePutsToHFilesForIncrementalLoad(
       PCollection<Put> puts,
-      HTable table,
+      Connection connection,
+      TableName tableName,
       Path outputPath,
       boolean limitToAffectedRegions) throws IOException {
     PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() {
@@ -447,21 +455,21 @@ public final class HFileUtils {
         }
       }
     }, HBaseTypes.cells());
-    writeToHFilesForIncrementalLoad(cells, table, outputPath, limitToAffectedRegions);
+    writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions);
   }
 
-  public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table) throws IOException {
-    return sortAndPartition(cells, table, false);
+  public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator) throws IOException {
+    return sortAndPartition(cells, regionLocator, false);
   }
 
   /**
-   * Sorts and partitions the provided <code>cells</code> for the given <code>table</code> to ensure all elements that belong
+   * Sorts and partitions the provided <code>cells</code> for the given <code>regionLocator</code> to ensure all elements that belong
    * in the same region end up in the same reducer. The flag <code>limitToAffectedRegions</code>, when set to true, will identify
    * the regions the data in <code>cells</code> belongs to and will set the number of reducers equal to the number of identified
    * affected regions. If set to false, then all regions will be used, and the number of reducers will be set to the number
    * of regions in the table.
    */
-  public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table, boolean limitToAffectedRegions) throws IOException {
+  public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator, boolean limitToAffectedRegions) throws IOException {
     Configuration conf = cells.getPipeline().getConfiguration();
     PTable<C, Void> t = cells.parallelDo(
         "Pre-partition",
@@ -474,9 +482,9 @@ public final class HFileUtils {
 
     List<KeyValue> splitPoints;
     if(limitToAffectedRegions) {
-      splitPoints = getSplitPoints(table, t);
+      splitPoints = getSplitPoints(regionLocator, t);
     } else {
-      splitPoints = getSplitPoints(table);
+      splitPoints = getSplitPoints(regionLocator);
     }
     Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "partition");
     writePartitionInfo(conf, partitionFile, splitPoints);
@@ -489,10 +497,10 @@ public final class HFileUtils {
     return t.groupByKey(options).ungroup().keys();
   }
 
-  private static List<KeyValue> getSplitPoints(HTable table) throws IOException {
-    List<byte[]> startKeys = ImmutableList.copyOf(table.getStartKeys());
+  private static List<KeyValue> getSplitPoints(RegionLocator regionLocator) throws IOException {
+    List<byte[]> startKeys = ImmutableList.copyOf(regionLocator.getStartKeys());
     if (startKeys.isEmpty()) {
-      throw new AssertionError(table + " has no regions!");
+      throw new AssertionError(regionLocator.getName().getNameAsString() + " has no regions!");
     }
     List<KeyValue> splitPoints = Lists.newArrayList();
     for (byte[] startKey : startKeys.subList(1, startKeys.size())) {
@@ -503,12 +511,12 @@ public final class HFileUtils {
     return splitPoints;
   }
 
-  private static <C> List<KeyValue> getSplitPoints(HTable table, PTable<C, Void> affectedRows) throws IOException {
+  private static <C> List<KeyValue> getSplitPoints(RegionLocator regionLocator, PTable<C, Void> affectedRows) throws IOException {
     List<byte[]> startKeys;
     try {
-      startKeys = Lists.newArrayList(table.getStartKeys());
+      startKeys = Lists.newArrayList(regionLocator.getStartKeys());
       if (startKeys.isEmpty()) {
-        throw new AssertionError(table + " has no regions!");
+        throw new AssertionError(regionLocator.getName().getNameAsString() + " has no regions!");
       }
     } catch (IOException e) {
       throw new CrunchRuntimeException(e);
@@ -604,8 +612,8 @@ public final class HFileUtils {
     if (kvs.isEmpty()) {
       return null;
     }
-    if (kvs.size() == 1 && kvs.get(0).getType() == KeyValue.Type.Put.getCode()) {
-      return new Result(kvs);
+    if (kvs.size() == 1 && kvs.get(0).getTypeByte() == KeyValue.Type.Put.getCode()) {
+      return Result.create(Collections.<Cell>singletonList(kvs.get(0)));
     }
 
     kvs = maybeDeleteFamily(kvs);
@@ -613,7 +621,7 @@ public final class HFileUtils {
     // In-place sort KeyValues by family, qualifier and then timestamp reversely (whenever ties, deletes appear first).
     Collections.sort(kvs, KEY_VALUE_COMPARATOR);
 
-    List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size());
+    List<Cell> results = Lists.newArrayListWithCapacity(kvs.size());
     for (int i = 0, j; i < kvs.size(); i = j) {
       j = i + 1;
       while (j < kvs.size() && hasSameFamilyAndQualifier(kvs.get(i), kvs.get(j))) {
@@ -624,7 +632,7 @@ public final class HFileUtils {
     if (results.isEmpty()) {
       return null;
     }
-    return new Result(results);
+    return Result.create(results);
   }
 
   /**
@@ -634,7 +642,7 @@ public final class HFileUtils {
   private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) {
     long deleteFamilyCut = -1;
     for (KeyValue kv : kvs) {
-      if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
+      if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
         deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp());
       }
     }
@@ -643,7 +651,7 @@ public final class HFileUtils {
     }
     List<KeyValue> results = Lists.newArrayList();
     for (KeyValue kv : kvs) {
-      if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
+      if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
         continue;
       }
       if (kv.getTimestamp() <= deleteFamilyCut) {
@@ -675,7 +683,7 @@ public final class HFileUtils {
     if (kvs.isEmpty()) {
       return kvs;
     }
-    if (kvs.get(0).getType() == KeyValue.Type.Put.getCode()) {
+    if (kvs.get(0).getTypeByte() == KeyValue.Type.Put.getCode()) {
       return kvs; // shortcut for the common case
     }
 
@@ -685,16 +693,16 @@ public final class HFileUtils {
       if (results.size() >= versions) {
         break;
       }
-      if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) {
+      if (kv.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
         break;
-      } else if (kv.getType() == KeyValue.Type.Put.getCode()) {
+      } else if (kv.getTypeByte() == KeyValue.Type.Put.getCode()) {
         if (kv.getTimestamp() != previousDeleteTimestamp) {
           results.add(kv);
         }
-      } else if (kv.getType() == KeyValue.Type.Delete.getCode()) {
+      } else if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
         previousDeleteTimestamp = kv.getTimestamp();
       } else {
-        throw new AssertionError("Unexpected KeyValue type: " + kv.getType());
+        throw new AssertionError("Unexpected KeyValue type: " + kv.getTypeByte());
       }
     }
     return results;

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
index c772515..0657a01 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
@@ -21,7 +21,6 @@ package org.apache.crunch.io.hbase;
 
 import org.apache.crunch.Pair;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
index ebef5d3..647eea4 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
@@ -71,12 +71,12 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
         try {
           table.close();
         } catch (IOException e) {
-          LOG.error("Exception closing HTable: {}", table.getName(), e);
+          LOG.error("Exception closing Table: {}", table.getName(), e);
         }
         try {
           connection.close();
         } catch (IOException e) {
-          LOG.error("Exception closing HTable: {}", table.getName(), e);
+          LOG.error("Exception closing Table: {}", table.getName(), e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hcatalog/pom.xml b/crunch-hcatalog/pom.xml
index e99814b..59ebe45 100644
--- a/crunch-hcatalog/pom.xml
+++ b/crunch-hcatalog/pom.xml
@@ -37,11 +37,51 @@ under the License.
     <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
       <artifactId>hive-hcatalog-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+        <exclusions>
+            <exclusion>  <!-- declare the exclusion here -->
+                <groupId>org.eclipse.jetty.aggregate</groupId>
+                <artifactId>jetty-all</artifactId>
+            </exclusion>
+             <exclusion>  <!-- declare the exclusion here -->
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>jetty</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>javax.servlet</groupId>
+                <artifactId>javax.servlet-api</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.glassfish</groupId>
+                <artifactId>javax.servlet</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.eclipse.jetty.orbit</groupId>
+                <artifactId>javax.servlet</artifactId>
+            </exclusion>
+        </exclusions> 
+     </dependency>
+
+    <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client</artifactId>
+        <exclusions>
+            <exclusion>  <!-- declare the exclusion here -->
+                <groupId>org.mortbay.jetty</groupId>
+                <artifactId>jetty</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>javax.servlet</groupId>
+                <artifactId>javax.servlet-api</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.glassfish</groupId>
+                <artifactId>javax.servlet</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.eclipse.jetty.orbit</groupId>
+                <artifactId>javax.servlet</artifactId>
+            </exclusion>
+        </exclusions> 
     </dependency>
 
     <dependency>
@@ -125,8 +165,7 @@ under the License.
     <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-hbase-handler</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
+      <version>3.0.0</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-spark/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml
index 233bb34..34189dc 100644
--- a/crunch-spark/pom.xml
+++ b/crunch-spark/pom.xml
@@ -51,8 +51,8 @@ under the License.
       <scope>provided</scope>
       <exclusions>
         <exclusion>
-	  <groupId>javax.servlet</groupId>
-	  <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
         </exclusion>
         <exclusion>
           <groupId>com.sun.jersey</groupId>
@@ -95,6 +95,11 @@ under the License.
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-mapreduce</artifactId>
+      <scope>provided</scope>
+    </dependency>
+     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java
index 815aaff..ab0b061 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java
@@ -36,24 +36,29 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
@@ -72,6 +77,7 @@ import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.crunch.types.writable.Writables.nulls;
 import static org.apache.crunch.types.writable.Writables.tableOf;
@@ -125,21 +131,19 @@ public class SparkHFileTargetIT implements Serializable {
     HBASE_TEST_UTILITY.startMiniCluster(1);
   }
 
-  private static HTable createTable(int splits) throws Exception {
+  private static Table createTable(int splits) throws Exception {
     HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
     return createTable(splits, hcol);
   }
 
-  private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception {
-    byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000));
-    HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
+  private static Table createTable(int splits, HColumnDescriptor... hcols) throws Exception {
+    TableName tableName = TableName.valueOf(Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000)));
     HTableDescriptor htable = new HTableDescriptor(tableName);
     for (HColumnDescriptor hcol : hcols) {
       htable.addFamily(hcol);
     }
-    admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
-    HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000);
-    return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName);
+    return HBASE_TEST_UTILITY.createTable(htable,
+        Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
   }
 
   @AfterClass
@@ -170,7 +174,7 @@ public class SparkHFileTargetIT implements Serializable {
 
     FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration());
     KeyValue kv = readFromHFiles(fs, outputPath, "and");
-    assertEquals(375L, Bytes.toLong(kv.getValue()));
+    assertEquals(375L, Bytes.toLong(CellUtil.cloneValue(kv)));
     pipeline.done();
   }
 
@@ -182,21 +186,25 @@ public class SparkHFileTargetIT implements Serializable {
     Path outputPath = getTempPathOnHDFS("out");
     byte[] columnFamilyA = Bytes.toBytes("colfamA");
     byte[] columnFamilyB = Bytes.toBytes("colfamB");
-    HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB));
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
+    Table testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB));
+    Connection connection = admin.getConnection();
+    RegionLocator regionLocator = connection.getRegionLocator(testTable.getName());
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
     PTable<String,Long> wordCounts = words.count();
     PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
             wordCountPuts,
-            testTable,
+            admin.getConnection(),
+            testTable.getName(),
             outputPath);
 
     PipelineResult result = pipeline.run();
     assertTrue(result.succeeded());
 
     new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration())
-            .doBulkLoad(outputPath, testTable);
+            .doBulkLoad(outputPath, admin, testTable, regionLocator);
 
     Map<String, Long> EXPECTED = ImmutableMap.<String, Long>builder()
             .put("__EMPTY__", 1345L)
@@ -221,8 +229,12 @@ public class SparkHFileTargetIT implements Serializable {
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath1 = getTempPathOnHDFS("out1");
     Path outputPath2 = getTempPathOnHDFS("out2");
-    HTable table1 = createTable(26);
-    HTable table2 = createTable(26);
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
+    Table table1 = createTable(26);
+    Table table2 = createTable(26);
+    Connection connection = admin.getConnection();
+    RegionLocator regionLocator1 = connection.getRegionLocator(table1.getName());
+    RegionLocator regionLocator2 = connection.getRegionLocator(table2.getName());
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration());
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
@@ -233,18 +245,20 @@ public class SparkHFileTargetIT implements Serializable {
     PTable<String, Long> longWordCounts = longWords.count();
     HFileUtils.writePutsToHFilesForIncrementalLoad(
             convertToPuts(shortWordCounts),
-            table1,
+            connection,
+            table1.getName(),
             outputPath1);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
             convertToPuts(longWordCounts),
-            table2,
+            connection,
+            table1.getName(),
             outputPath2);
 
     PipelineResult result = pipeline.run();
     assertTrue(result.succeeded());
 
-    loader.doBulkLoad(outputPath1, table1);
-    loader.doBulkLoad(outputPath2, table2);
+    loader.doBulkLoad(outputPath1, admin, table1, regionLocator1);
+    loader.doBulkLoad(outputPath2, admin, table2, regionLocator2);
 
     assertEquals(314L, getWordCountFromTable(table1, "of"));
     assertEquals(375L, getWordCountFromTable(table2, "and"));
@@ -260,9 +274,11 @@ public class SparkHFileTargetIT implements Serializable {
         SparkHFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration());
     Path inputPath = copyResourceFileToHDFS("shakes.txt");
     Path outputPath = getTempPathOnHDFS("out");
+    Admin admin = HBASE_TEST_UTILITY.getAdmin();
     HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
     hcol.setDataBlockEncoding(newBlockEncoding);
-    HTable testTable = createTable(26, hcol);
+    Table testTable = createTable(26, hcol);
+    Connection connection = admin.getConnection();
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
@@ -270,7 +286,8 @@ public class SparkHFileTargetIT implements Serializable {
     PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
     HFileUtils.writePutsToHFilesForIncrementalLoad(
             wordCountPuts,
-            testTable,
+            connection,
+            testTable.getName(),
             outputPath);
 
     PipelineResult result = pipeline.run();
@@ -286,7 +303,7 @@ public class SparkHFileTargetIT implements Serializable {
       }
       HFile.Reader reader = null;
       try {
-        reader = HFile.createReader(fs, f, new CacheConfig(conf), conf);
+        reader = HFile.createReader(fs, f, new CacheConfig(conf), true, conf);
         assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding());
       } finally {
         if (reader != null) {
@@ -314,7 +331,7 @@ public class SparkHFileTargetIT implements Serializable {
         long c = input.second();
         Put p = new Put(Bytes.toBytes(w));
         for (byte[] columnFamily : columnFamilies) {
-          p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c));
+          p.addColumn(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c));
         }
         return p;
       }
@@ -331,7 +348,7 @@ public class SparkHFileTargetIT implements Serializable {
         }
         long c = input.second();
         Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c));
-        return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null);
+        return Pair.of(KeyValueUtil.copyToNewKeyValue(cell), null);
       }
     }, tableOf(HBaseTypes.keyValues(), nulls()))
             .groupByKey(GroupingOptions.builder()
@@ -355,28 +372,31 @@ public class SparkHFileTargetIT implements Serializable {
   /** Reads the first value on a given row from a bunch of hfiles. */
   private static KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException {
     List<KeyValueScanner> scanners = Lists.newArrayList();
-    KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row));
+    KeyValue fakeKV = KeyValueUtil.createFirstOnRow(Bytes.toBytes(row));
     for (FileStatus e : fs.listStatus(mrOutputPath)) {
       Path f = e.getPath();
       if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
         continue;
       }
-      StoreFile.Reader reader = new StoreFile.Reader(
-              fs,
-              f,
-              new CacheConfig(fs.getConf()),
-              fs.getConf());
-      StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+      StoreFileReader reader = new StoreFileReader(
+          fs,
+          f,
+          new CacheConfig(fs.getConf()),
+          true,
+          new AtomicInteger(),
+          false,
+          fs.getConf());
+      StoreFileScanner scanner = reader.getStoreFileScanner(false, false, false, 0, 0, false);
       scanner.seek(fakeKV); // have to call seek of each underlying scanner, otherwise KeyValueHeap won't work
       scanners.add(scanner);
     }
     assertTrue(!scanners.isEmpty());
-    KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
+    KeyValueScanner kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
     boolean seekOk = kvh.seek(fakeKV);
     assertTrue(seekOk);
     Cell kv = kvh.next();
     kvh.close();
-    return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of());
+    return KeyValueUtil.copyToNewKeyValue(kv);
   }
 
   private static Path copyResourceFileToHDFS(String resourceName) throws IOException {
@@ -403,11 +423,11 @@ public class SparkHFileTargetIT implements Serializable {
     return result.makeQualified(fs);
   }
 
-  private static long getWordCountFromTable(HTable table, String word) throws IOException {
+  private static long getWordCountFromTable(Table table, String word) throws IOException {
     return getWordCountFromTable(table, TEST_FAMILY, word);
   }
 
-  private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word) throws IOException {
+  private static long getWordCountFromTable(Table table, byte[] columnFamily, String word) throws IOException {
     Get get = new Get(Bytes.toBytes(word));
     get.addFamily(columnFamily);
     byte[] value = table.get(get).value();