You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/08/02 17:20:59 UTC

[3/3] git commit: CRUNCH-212: Target wrapper for HFileOuptutFormat

CRUNCH-212: Target wrapper for HFileOuptutFormat


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/92ea0592
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/92ea0592
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/92ea0592

Branch: refs/heads/master
Commit: 92ea0592f10f5cfaa6a45372a92eac4b6d2e8623
Parents: d4a0696
Author: Chao Shi <ch...@apache.org>
Authored: Fri Aug 2 23:18:21 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Fri Aug 2 23:19:46 2013 +0800

----------------------------------------------------------------------
 crunch-hbase/pom.xml                            |    5 +
 .../apache/crunch/io/hbase/HFileTargetIT.java   |  248 ++
 crunch-hbase/src/it/resources/log4j.properties  |   29 +
 crunch-hbase/src/it/resources/shakes.txt        | 3667 ++++++++++++++++++
 .../io/hbase/HFileOutputFormatForCrunch.java    |  133 +
 .../org/apache/crunch/io/hbase/HFileTarget.java |   40 +
 .../org/apache/crunch/io/hbase/HFileUtils.java  |  161 +
 .../org/apache/crunch/io/hbase/ToHBase.java     |    8 +
 8 files changed, 4291 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index 828a38f..95b3f38 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -40,6 +40,11 @@ under the License.
     </dependency>
 
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
new file mode 100644
index 0000000..86d965b
--- /dev/null
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import org.apache.commons.io.IOUtils;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class HFileTargetIT implements Serializable {
+
+  private static final HBaseTestingUtility HBASE_TEST_UTILITY = new HBaseTestingUtility();
+  private static final byte[] TEST_TABLE = Bytes.toBytes("test_table");
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("test_family");
+  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("count");
+  private static final Path TEMP_DIR = new Path("/tmp");
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    // We have to use mini mapreduce cluster, because LocalJobRunner allows only a single reducer
+    // (we will need it to test bulk load against multiple regions).
+    HBASE_TEST_UTILITY.startMiniCluster();
+    HBASE_TEST_UTILITY.startMiniMapReduceCluster();
+    HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
+    HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
+    HTableDescriptor htable = new HTableDescriptor(TEST_TABLE);
+    htable.addFamily(hcol);
+    byte[][] splits = new byte[26][];
+    for (int i = 0; i < 26; i++) {
+      byte b = (byte)('a' + i);
+      splits[i] = new byte[] { b };
+    }
+    admin.createTable(htable, splits);
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    HBASE_TEST_UTILITY.shutdownMiniMapReduceCluster();
+    HBASE_TEST_UTILITY.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration());
+    fs.delete(TEMP_DIR, true);
+    HBASE_TEST_UTILITY.truncateTable(TEST_TABLE);
+  }
+
+  @Test
+  public void testHFileTarget() throws IOException {
+    Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+    Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
+    Path inputPath = copyResourceFileToHDFS("shakes.txt");
+    Path outputPath = getTempPathOnHDFS("out");
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<String> words = split(shakespeare, "\\s+");
+    PTable<String,Long> wordCounts = words.count();
+    PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
+    pipeline.write(wordCountKvs, ToHBase.hfile(outputPath));
+
+    PipelineResult result = pipeline.run();
+    assertTrue(result.succeeded());
+
+    FileSystem fs = FileSystem.get(conf);
+    KeyValue kv = readFromHFiles(fs, outputPath, "and");
+    assertEquals(427L, Bytes.toLong(kv.getValue()));
+  }
+
+  @Test
+  public void testBulkLoad() throws Exception {
+    Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+    Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
+    Path inputPath = copyResourceFileToHDFS("shakes.txt");
+    Path outputPath = getTempPathOnHDFS("out");
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<String> words = split(shakespeare, "\\s+");
+    PTable<String,Long> wordCounts = words.count();
+    PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
+    HTable testTable = new HTable(HBASE_TEST_UTILITY.getConfiguration(), TEST_TABLE);
+    HFileUtils.writeToHFilesForIncrementalLoad(
+        wordCountKvs,
+        testTable,
+        outputPath);
+
+    PipelineResult result = pipeline.run();
+    assertTrue(result.succeeded());
+
+    new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration())
+        .doBulkLoad(outputPath, testTable);
+
+    Map<String, Long> EXPECTED = ImmutableMap.<String, Long>builder()
+        .put("", 1470L)
+        .put("the", 620L)
+        .put("and", 427L)
+        .put("of", 396L)
+        .put("to", 367L)
+        .build();
+    for (Map.Entry<String, Long> e : EXPECTED.entrySet()) {
+      assertEquals((long) e.getValue(), Bytes.toLong(
+          testTable.get(new Get(Bytes.toBytes(e.getKey()))).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER).getValue()));
+    }
+  }
+
+  private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) {
+    return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() {
+      @Override
+      public KeyValue map(Pair<String, Long> input) {
+        String w = input.first();
+        long c = input.second();
+        KeyValue kv = new KeyValue(
+            Bytes.toBytes(w),
+            TEST_FAMILY,
+            TEST_QUALIFIER,
+            Bytes.toBytes(c));
+        return kv;
+      }
+    }, Writables.writables(KeyValue.class));
+  }
+
+  private PCollection<String> split(PCollection<String> in, final String regex) {
+    return in.parallelDo(new DoFn<String, String>() {
+      @Override
+      public void process(String input, Emitter<String> emitter) {
+        for (String w : input.split(regex)) {
+          emitter.emit(w);
+        }
+      }
+    }, Writables.strings());
+  }
+
+  /** Reads the first value on a given row from a bunch of hfiles. */
+  private KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException {
+    List<KeyValueScanner> scanners = Lists.newArrayList();
+    KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row));
+    for (FileStatus e : fs.listStatus(mrOutputPath)) {
+      Path f = e.getPath();
+      if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
+        continue;
+      }
+      StoreFile.Reader reader = new StoreFile.Reader(
+          fs,
+          f,
+          new CacheConfig(fs.getConf()),
+          DataBlockEncoding.NONE);
+      StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+      scanner.seek(fakeKV); // have to call seek of each underlying scanner, otherwise KeyValueHeap won't work
+      scanners.add(scanner);
+    }
+    assertTrue(!scanners.isEmpty());
+    KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
+    boolean seekOk = kvh.seek(fakeKV);
+    assertTrue(seekOk);
+    KeyValue kv = kvh.next();
+    kvh.close();
+    return kv;
+  }
+
+  private Path copyResourceFileToHDFS(String resourceName) throws IOException {
+    Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    Path resultPath = getTempPathOnHDFS(resourceName);
+    InputStream in = null;
+    OutputStream out = null;
+    try {
+      in = Resources.getResource(resourceName).openConnection().getInputStream();
+      out = fs.create(resultPath);
+      IOUtils.copy(in, out);
+    } finally {
+      IOUtils.closeQuietly(in);
+      IOUtils.closeQuietly(out);
+    }
+    return resultPath;
+  }
+
+  private Path getTempPathOnHDFS(String fileName) throws IOException {
+    Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    Path result = new Path(TEMP_DIR, fileName);
+    return result.makeQualified(fs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/92ea0592/crunch-hbase/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/resources/log4j.properties b/crunch-hbase/src/it/resources/log4j.properties
new file mode 100644
index 0000000..5d144a0
--- /dev/null
+++ b/crunch-hbase/src/it/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# Log warnings on Hadoop for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+# Except for Configuration, which is chatty.
+log4j.logger.org.apache.hadoop.conf.Configuration=error, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n