You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/01/26 08:38:20 UTC
svn commit: r903110 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
Author: nzhang
Date: Tue Jan 26 07:38:20 2010
New Revision: 903110
URL: http://svn.apache.org/viewvc?rev=903110&view=rev
Log:
HIVE-1088: RCFile RecordReaders first split will read duplicate rows if the split end is < the first SYNC mark (Yongqiang He via Ning Zhang)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Jan 26 07:38:20 2010
@@ -1504,3 +1504,7 @@
HIVE-283. Do case insensitive comparison of aliases in partition
pruning. (athusoo)
+
+ HIVE-1088. RCFileRecordReader's first split will read duplicate rows if
+ the split end is < the first sync mark (Yongqiang He via Ning Zhang)
+
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Jan 26 07:38:20 2010
@@ -926,7 +926,9 @@
private final byte[] sync = new byte[SYNC_HASH_SIZE];
private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
private boolean syncSeen;
+ private long lastSeenSyncPos = 0;
+ private long headerEnd;
private final long end;
private int currentKeyLength;
private int currentRecordLength;
@@ -1098,6 +1100,7 @@
if (version > 1) { // if version > 1
in.readFully(sync); // read sync bytes
+ headerEnd = in.getPos();
}
}
@@ -1126,6 +1129,15 @@
seek(end);
return;
}
+
+ //this is to handle syn(pos) where pos < headerEnd.
+ if (position < headerEnd) {
+ // seek directly to first record
+ in.seek(headerEnd);
+ // note the sync marker "seen" in the header
+ syncSeen = true;
+ return;
+ }
try {
seek(position + 4); // skip escape
@@ -1184,6 +1196,7 @@
if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process
// a
// sync entry
+ lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) {
throw new IOException("File is corrupt!");
@@ -1440,6 +1453,11 @@
public boolean syncSeen() {
return syncSeen;
}
+
+ /** Returns the last seen sync position */
+ public long lastSeenSyncPos() {
+ return lastSeenSyncPos;
+ }
/** Returns the name of the file. */
@Override
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Tue Jan 26 07:38:20 2010
@@ -72,28 +72,15 @@
conf);
}
- private boolean firstSeen = true;
-
@Override
public boolean next(LongWritable key, BytesRefArrayWritable value)
throws IOException {
- if (!more) {
- return false;
- }
- long pos = in.getPosition();
- boolean hasMore = in.next(key);
- if (hasMore) {
+
+ more = next(key);
+
+ if (more) {
in.getCurrentRow(value);
}
- if (pos >= end && in.syncSeen() && !in.hasRecordsInBuffer()) {
- more = false;
- if (firstSeen) {
- firstSeen = false;
- return true;
- }
- } else {
- more = hasMore;
- }
return more;
}
@@ -101,12 +88,16 @@
if (!more) {
return false;
}
- long pos = in.getPosition();
- boolean hasMore = in.next(key);
- if (pos >= end && in.syncSeen()) {
+
+ more = in.next(key);
+ if (!more) {
+ return false;
+ }
+
+ long lastSeenSyncPos = in.lastSeenSyncPos();
+ if(lastSeenSyncPos >= end) {
more = false;
- } else {
- more = hasMore;
+ return more;
}
return more;
}
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=903110&r1=903109&r2=903110&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Tue Jan 26 07:38:20 2010
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -47,7 +48,12 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
public class TestRCFile extends TestCase {
@@ -374,4 +380,79 @@
long cost = System.currentTimeMillis() - start;
LOG.debug("reading fully costs:" + cost + " milliseconds");
}
+
+ public void testSynAndSplit() throws IOException {
+ splitBeforeSync();
+ splitRightBeforeSync();
+ splitInMiddleOfSync();
+ splitRightAfterSync();
+ splitAfterSync();
+ }
+
+ private void splitBeforeSync() throws IOException {
+ writeThenReadByRecordReader(600, 1000, 2, 1, null);
+ }
+
+ private void splitRightBeforeSync() throws IOException {
+ writeThenReadByRecordReader(500, 1000, 2, 17750, null);
+ }
+
+ private void splitInMiddleOfSync() throws IOException {
+ writeThenReadByRecordReader(500, 1000, 2, 17760, null);
+
+ }
+
+ private void splitRightAfterSync() throws IOException {
+ writeThenReadByRecordReader(500, 1000, 2, 17770, null);
+ }
+
+ private void splitAfterSync() throws IOException {
+ writeThenReadByRecordReader(500, 1000, 2, 19950, null);
+ }
+
+ private void writeThenReadByRecordReader(int intervalRecordCount,
+ int writeCount, int splitNumber, long minSplitSize, CompressionCodec codec)
+ throws IOException {
+ Path testDir = new Path(System.getProperty("test.data.dir", ".") + "/mapred/testsmallfirstsplit");
+ Path testFile = new Path(testDir, "test_rcfile");
+ fs.delete(testFile, true);
+ Configuration cloneConf = new Configuration(conf);
+ RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
+ cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
+
+ RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
+
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
+ for (int i = 0; i < bytesArray.length; i++) {
+ BytesRefWritable cu = null;
+ cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
+ bytes.set(i, cu);
+ }
+ for (int i = 0; i < writeCount; i++) {
+ if(i == intervalRecordCount)
+ System.out.println("write position:" + writer.getLength());
+ writer.append(bytes);
+ }
+ writer.close();
+
+ RCFileInputFormat inputFormat = new RCFileInputFormat();
+ JobConf jonconf = new JobConf(cloneConf);
+ jonconf.set("mapred.input.dir", testDir.toString());
+ jonconf.setLong("mapred.min.split.size", minSplitSize);
+ InputSplit[] splits = inputFormat.getSplits(jonconf, splitNumber);
+ assertEquals("splits length should be " + splitNumber, splits.length, splitNumber);
+ int readCount = 0;
+ for (int i = 0; i < splits.length; i++) {
+ int previousReadCount = readCount;
+ RecordReader rr = inputFormat.getRecordReader(splits[i], jonconf, Reporter.NULL);
+ Object key = rr.createKey();
+ Object value = rr.createValue();
+ while(rr.next(key, value))
+ readCount ++;
+ System.out.println("The " + i + "th split read "
+ + (readCount - previousReadCount));
+ }
+ assertEquals("readCount should be equal to writeCount", readCount, writeCount);
+ }
+
}