You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/01/26 08:38:20 UTC

svn commit: r903110 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java

Author: nzhang
Date: Tue Jan 26 07:38:20 2010
New Revision: 903110

URL: http://svn.apache.org/viewvc?rev=903110&view=rev
Log:
HIVE-1088: RCFile RecordReaders first split will read duplicate rows if the split end is < the first SYNC mark (Yongqiang He via Ning Zhang)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Jan 26 07:38:20 2010
@@ -1504,3 +1504,7 @@
 
     HIVE-283. Do case insensitive comparison of aliases in partition
     pruning. (athusoo)
+
+    HIVE-1088. RCFileRecordReader's first split will read duplicate rows if
+    the split end is < the first sync mark (Yongqiang He via Ning Zhang)
+

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Jan 26 07:38:20 2010
@@ -926,7 +926,9 @@
     private final byte[] sync = new byte[SYNC_HASH_SIZE];
     private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
     private boolean syncSeen;
+    private long lastSeenSyncPos = 0;
 
+    private long headerEnd;
     private final long end;
     private int currentKeyLength;
     private int currentRecordLength;
@@ -1098,6 +1100,7 @@
 
       if (version > 1) { // if version > 1
         in.readFully(sync); // read sync bytes
+        headerEnd = in.getPos();
       }
     }
 
@@ -1126,6 +1129,15 @@
         seek(end);
         return;
       }
+      
+      //this is to handle syn(pos) where pos < headerEnd.
+      if (position < headerEnd) {
+        // seek directly to first record
+        in.seek(headerEnd);
+        // note the sync marker "seen" in the header
+        syncSeen = true;
+        return;
+      }
 
       try {
         seek(position + 4); // skip escape
@@ -1184,6 +1196,7 @@
       if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process
         // a
         // sync entry
+        lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
         in.readFully(syncCheck); // read syncCheck
         if (!Arrays.equals(sync, syncCheck)) {
           throw new IOException("File is corrupt!");
@@ -1440,6 +1453,11 @@
     public boolean syncSeen() {
       return syncSeen;
     }
+    
+    /** Returns the last seen sync position */
+    public long lastSeenSyncPos() {
+      return lastSeenSyncPos;
+    }
 
     /** Returns the name of the file. */
     @Override

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Tue Jan 26 07:38:20 2010
@@ -72,28 +72,15 @@
         conf);
   }
 
-  private boolean firstSeen = true;
-
   @Override
   public boolean next(LongWritable key, BytesRefArrayWritable value)
       throws IOException {
-    if (!more) {
-      return false;
-    }
-    long pos = in.getPosition();
-    boolean hasMore = in.next(key);
-    if (hasMore) {
+    
+    more = next(key);
+    
+    if (more) {
       in.getCurrentRow(value);
     }
-    if (pos >= end && in.syncSeen() && !in.hasRecordsInBuffer()) {
-      more = false;
-      if (firstSeen) {
-        firstSeen = false;
-        return true;
-      }
-    } else {
-      more = hasMore;
-    }
     return more;
   }
 
@@ -101,12 +88,16 @@
     if (!more) {
       return false;
     }
-    long pos = in.getPosition();
-    boolean hasMore = in.next(key);
-    if (pos >= end && in.syncSeen()) {
+    
+    more = in.next(key);
+    if (!more) {
+      return false;
+    }
+    
+    long lastSeenSyncPos = in.lastSeenSyncPos();
+    if(lastSeenSyncPos >= end) {
       more = false;
-    } else {
-      more = hasMore;
+      return more;
     }
     return more;
   }

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Tue Jan 26 07:38:20 2010
@@ -30,6 +30,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -47,7 +48,12 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 
 public class TestRCFile extends TestCase {
 
@@ -374,4 +380,79 @@
     long cost = System.currentTimeMillis() - start;
     LOG.debug("reading fully costs:" + cost + " milliseconds");
   }
+  
+  public void testSynAndSplit() throws IOException {
+    splitBeforeSync();
+    splitRightBeforeSync();
+    splitInMiddleOfSync();
+    splitRightAfterSync();
+    splitAfterSync();
+  }
+
+  private void splitBeforeSync() throws IOException {
+    writeThenReadByRecordReader(600, 1000, 2, 1, null);
+  }
+  
+  private void splitRightBeforeSync() throws IOException {
+    writeThenReadByRecordReader(500, 1000, 2, 17750, null);
+  }
+  
+  private void splitInMiddleOfSync() throws IOException {
+    writeThenReadByRecordReader(500, 1000, 2, 17760, null);
+    
+  }
+  
+  private void splitRightAfterSync() throws IOException {
+    writeThenReadByRecordReader(500, 1000, 2, 17770, null);
+  }
+  
+  private void splitAfterSync() throws IOException {
+    writeThenReadByRecordReader(500, 1000, 2, 19950, null);
+  }
+
+  private void writeThenReadByRecordReader(int intervalRecordCount,
+      int writeCount, int splitNumber, long minSplitSize, CompressionCodec codec)
+      throws IOException {
+    Path testDir = new Path(System.getProperty("test.data.dir", ".") + "/mapred/testsmallfirstsplit");
+    Path testFile = new Path(testDir, "test_rcfile");
+    fs.delete(testFile, true);
+    Configuration cloneConf = new Configuration(conf);
+    RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
+    cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
+
+    RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
+
+    BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
+    for (int i = 0; i < bytesArray.length; i++) {
+      BytesRefWritable cu = null;
+      cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
+      bytes.set(i, cu);
+    }
+    for (int i = 0; i < writeCount; i++) {
+      if(i == intervalRecordCount)
+        System.out.println("write position:" + writer.getLength());
+      writer.append(bytes);
+    }
+    writer.close();
+    
+    RCFileInputFormat inputFormat = new RCFileInputFormat();
+    JobConf jonconf = new JobConf(cloneConf);
+    jonconf.set("mapred.input.dir", testDir.toString());
+    jonconf.setLong("mapred.min.split.size", minSplitSize);
+    InputSplit[] splits = inputFormat.getSplits(jonconf, splitNumber);
+    assertEquals("splits length should be " + splitNumber, splits.length, splitNumber);
+    int readCount = 0;
+    for (int i = 0; i < splits.length; i++) {
+      int previousReadCount = readCount;
+      RecordReader rr = inputFormat.getRecordReader(splits[i], jonconf, Reporter.NULL);
+      Object key = rr.createKey();
+      Object value = rr.createValue();
+      while(rr.next(key, value)) 
+        readCount ++;
+      System.out.println("The " + i + "th split read "
+          + (readCount - previousReadCount));
+    }
+    assertEquals("readCount should be equal to writeCount", readCount, writeCount);
+  }
+  
 }