You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/11 10:11:40 UTC

[11/17] git commit: TAJO-999: SequenceFile key class need to be compatible. (jaehwa)

TAJO-999: SequenceFile key class need to be compatible. (jaehwa)

Closes #110


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0603b49d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0603b49d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0603b49d

Branch: refs/heads/index_support
Commit: 0603b49dddb5d2b3c6f6447fa672e0ea03eae09e
Parents: fcc5da0
Author: Jaehwa Jung <bl...@apache.org>
Authored: Mon Aug 11 11:36:18 2014 +0900
Committer: Jaehwa Jung <bl...@apache.org>
Committed: Mon Aug 11 11:36:18 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 ++
 .../sequencefile/SequenceFileAppender.java      | 20 +++++++++++---------
 .../sequencefile/SequenceFileScanner.java       | 20 ++++++++++++++------
 .../tajo/storage/TestCompressionStorages.java   | 12 ++++++++++++
 .../org/apache/tajo/storage/TestStorages.java   | 13 +++++++++++++
 5 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 864eaef..deb41a0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-999: SequenceFile key class need to be compatible. (jaehwa)
+
     TAJO-994: 'count(distinct x)' function counts first null value. (hyunsik)
 
     TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses

http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index b150a9a..86d902a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -25,11 +25,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.tajo.catalog.Schema;
@@ -73,7 +70,8 @@ public class SequenceFileAppender extends FileAppender {
 
   long rowCount;
   private boolean isShuffle;
-  private static final BytesWritable EMPTY_KEY = new BytesWritable();
+
+  private Writable EMPTY_KEY;
 
   public SequenceFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
     super(conf, schema, meta, path);
@@ -128,20 +126,24 @@ public class SequenceFileAppender extends FileAppender {
       throw new IOException(e);
     }
 
-    Class<? extends Writable>  valueClass;
+    Class<? extends Writable>  keyClass, valueClass;
     if (serde instanceof BinarySerializerDeserializer) {
+      keyClass = BytesWritable.class;
+      EMPTY_KEY = new BytesWritable();
       valueClass = BytesWritable.class;
     } else {
+      keyClass = LongWritable.class;
+      EMPTY_KEY = new LongWritable();
       valueClass = Text.class;
     }
 
     String type = this.meta.getOption(StorageConstants.COMPRESSION_TYPE, CompressionType.NONE.name());
     if (type.equals(CompressionType.BLOCK.name())) {
-      writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.BLOCK, codec);
+      writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.BLOCK, codec);
     } else if (type.equals(CompressionType.RECORD.name())) {
-      writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.RECORD, codec);
+      writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.RECORD, codec);
     } else {
-      writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.NONE, codec);
+      writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec);
     }
 
     if (enabledStats) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index 32d1d57..3c39841 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -24,10 +24,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -37,6 +35,7 @@ import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.ReflectionUtil;
 
 import java.io.IOException;
 
@@ -72,7 +71,7 @@ public class SequenceFileScanner extends FileScanner {
 
   private int elementOffset, elementSize;
 
-  private static final BytesWritable EMPTY_KEY = new BytesWritable();
+  private Writable EMPTY_KEY;
 
   public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
     super(conf, schema, meta, fragment);
@@ -120,8 +119,13 @@ public class SequenceFileScanner extends FileScanner {
       String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
       serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
 
-      if (serde instanceof BinarySerializerDeserializer)
+      if (serde instanceof BinarySerializerDeserializer) {
         hasBinarySerDe = true;
+      }
+
+      Class<? extends Writable> keyClass = (Class<? extends Writable>)Class.forName(reader.getKeyClassName());
+      EMPTY_KEY = keyClass.newInstance();
+
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       throw new IOException(e);
@@ -129,6 +133,10 @@ public class SequenceFileScanner extends FileScanner {
     super.init();
   }
 
+  public Writable getKey() {
+    return EMPTY_KEY;
+  }
+
   private void prepareProjection(Column [] targets) {
     projectionMap = new int[targets.length];
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 9fe5721..61f4682 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -21,7 +21,10 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.util.NativeCodeLoader;
@@ -34,6 +37,7 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,6 +48,7 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestCompressionStorages {
@@ -228,6 +233,13 @@ public class TestCompressionStorages {
       }
     }
     scanner.init();
+
+    if (storeType == StoreType.SEQUENCEFILE) {
+      assertTrue(scanner instanceof SequenceFileScanner);
+      Writable key = ((SequenceFileScanner) scanner).getKey();
+      assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+    }
+
     int tupleCnt = 0;
     Tuple tuple;
     while ((tuple = scanner.next()) != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 1cf1ecf..3bea740 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -21,6 +21,9 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.catalog.CatalogUtil;
@@ -36,6 +39,7 @@ import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;
@@ -48,6 +52,7 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
@@ -640,6 +645,10 @@ public class TestStorages {
     Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
+    assertTrue(scanner instanceof SequenceFileScanner);
+    Writable key = ((SequenceFileScanner) scanner).getKey();
+    assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+
     Tuple retrieved;
     while ((retrieved=scanner.next()) != null) {
       for (int i = 0; i < tuple.size(); i++) {
@@ -709,6 +718,10 @@ public class TestStorages {
     Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
+    assertTrue(scanner instanceof SequenceFileScanner);
+    Writable key = ((SequenceFileScanner) scanner).getKey();
+    assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
+
     Tuple retrieved;
     while ((retrieved=scanner.next()) != null) {
       for (int i = 0; i < tuple.size(); i++) {