You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2013/11/12 03:48:17 UTC

svn commit: r1540927 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/ test/resources/org/apache/hadoop/hbase/mapreduce/

Author: jeffreyz
Date: Tue Nov 12 02:48:16 2013
New Revision: 1540927

URL: http://svn.apache.org/r1540927
Log:
HBASE-9895: 0.96 Import utility can't import an exported file from 0.94

Added:
    hbase/branches/0.96/hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/
    hbase/branches/0.96/hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format   (with props)
Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1540927&r1=1540926&r2=1540927&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Tue Nov 12 02:48:16 2013
@@ -451,6 +451,10 @@ public class Import {
       usage("Wrong number of arguments: " + otherArgs.length);
       System.exit(-1);
     }
+    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
+    if (inputVersionString != null) {
+      conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
+    }
     Job job = createSubmittableJob(conf, otherArgs);
     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java?rev=1540927&r1=1540926&r2=1540927&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java Tue Nov 12 02:48:16 2013
@@ -17,18 +17,33 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
 
-public class ResultSerialization implements Serialization<Result> {
+public class ResultSerialization extends Configured implements Serialization<Result> {
+  private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
+  // The following configuration property indicates import file format version.
+  public static final String IMPORT_FORMAT_VER = "hbase.import.version";
+
   @Override
   public boolean accept(Class<?> c) {
     return Result.class.isAssignableFrom(c);
@@ -36,6 +51,16 @@ public class ResultSerialization impleme
 
   @Override
   public Deserializer<Result> getDeserializer(Class<Result> c) {
+    // check input format version
+    Configuration conf = getConf();
+    if (conf != null) {
+      String inputVersion = conf.get(IMPORT_FORMAT_VER);
+      if (inputVersion != null && inputVersion.equals("0.94")) {
+        LOG.info("Load exported file using deserializer for HBase 0.94 format");
+        return new Result94Deserializer();
+      }
+    }
+
     return new ResultDeserializer();
   }
 
@@ -44,6 +69,52 @@ public class ResultSerialization impleme
     return new ResultSerializer();
   }
 
+  /**
+   * The following deserializer class is used to load exported file of 0.94
+   */
+  private static class Result94Deserializer implements Deserializer<Result> {
+    private DataInputStream in;
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public Result deserialize(Result mutation) throws IOException {
+      int totalBuffer = in.readInt();
+      if (totalBuffer == 0) {
+        return Result.EMPTY_RESULT;
+      }
+      byte[] buf = new byte[totalBuffer];
+      readChunked(in, buf, 0, totalBuffer);
+      List<Cell> kvs = new ArrayList<Cell>();
+      int offset = 0;
+      while (offset < totalBuffer) {
+        int keyLength = Bytes.toInt(buf, offset);
+        offset += Bytes.SIZEOF_INT;
+        kvs.add(new KeyValue(buf, offset, keyLength));
+        offset += keyLength;
+      }
+      return Result.create(kvs);
+    }
+
+    @Override
+    public void open(InputStream in) throws IOException {
+      if (!(in instanceof DataInputStream)) {
+        throw new IOException("Wrong input stream instance passed in");
+      }
+      this.in = (DataInputStream) in;
+    }
+
+    private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
+      int maxRead = 8192;
+
+      for (; ofs < len; ofs += maxRead)
+        in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
+    }
+  }
+
   private static class ResultDeserializer implements Deserializer<Result> {
     private InputStream in;
 
@@ -54,8 +125,7 @@ public class ResultSerialization impleme
 
     @Override
     public Result deserialize(Result mutation) throws IOException {
-      ClientProtos.Result proto =
-          ClientProtos.Result.parseDelimitedFrom(in);
+      ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
       return ProtobufUtil.toResult(proto);
     }
 
@@ -63,8 +133,8 @@ public class ResultSerialization impleme
     public void open(InputStream in) throws IOException {
       this.in = in;
     }
-    
   }
+
   private static class ResultSerializer implements Serializer<Result> {
     private OutputStream out;
 

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java?rev=1540927&r1=1540926&r2=1540927&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Tue Nov 12 02:48:16 2013
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -176,7 +177,7 @@ public class TestImportExport {
 
   /**
    * Test export hbase:meta table
-   * 
+   *
    * @throws Exception
    */
   @Test
@@ -187,6 +188,38 @@ public class TestImportExport {
   }
 
   /**
+   * Test import data from 0.94 exported file
+   * @throws Exception
+   */
+  @Test
+  public void testImport94Table() throws Exception {
+    URL url = TestImportExport.class.getResource(
+        "exportedTableIn94Format");
+    Path importPath = new Path(url.getPath());
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
+        + "exportedTableIn94Format"));
+    String IMPORT_TABLE = "importTableExportedFrom94";
+    HTable t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
+    String[] args = new String[] {
+        "-Dhbase.import.version=0.94" ,
+        IMPORT_TABLE, FQ_OUTPUT_DIR
+    };
+    assertTrue(runImport(args));
+
+    /* exportedTableIn94Format contains 5 rows
+     ROW         COLUMN+CELL
+     r1          column=f1:c1, timestamp=1383766761171, value=val1
+     r2          column=f1:c1, timestamp=1383766771642, value=val2
+     r3          column=f1:c1, timestamp=1383766777615, value=val3
+     r4          column=f1:c1, timestamp=1383766785146, value=val4
+     r5          column=f1:c1, timestamp=1383766791506, value=val5
+     */
+    assertEquals(5, UTIL.countRows(t));
+    t.close();
+  }
+
+  /**
    * Test export scanner batching
    */
    @Test

Added: hbase/branches/0.96/hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format?rev=1540927&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hbase/branches/0.96/hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format
------------------------------------------------------------------------------
    svn:executable = *

Propchange: hbase/branches/0.96/hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream