You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sz...@apache.org on 2017/07/22 11:38:47 UTC
svn commit: r1802676 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/impl/io/ test/org/apache/pig/data/
test/org/apache/pig/test/
Author: szita
Date: Sat Jul 22 11:38:47 2017
New Revision: 1802676
URL: http://svn.apache.org/viewvc?rev=1802676&view=rev
Log:
PIG-3655: BinStorage and InterStorage approach to record markers is broken (szita)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 22 11:38:47 2017
@@ -38,6 +38,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3655: BinStorage and InterStorage approach to record markers is broken (szita)
+
PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark mode after PIG-5157 (nkollar via szita)
PIG-4767: Partition filter not pushed down when filter clause references variable from another load path (knoguchi)
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Jul 22 11:38:47 2017
@@ -40,6 +40,24 @@ public class PigConfiguration {
*/
public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
+
+ /**
+ * Sets the length of record markers in binary files produces by Pig between jobs
+ * The longer the byte sequence means less chance of collision with actual data,
+ * shorter sequence means less overhead
+ */
+ public static final String PIG_INTERSTORAGE_SYNCMARKER_SIZE = "pig.interstorage.syncmarker.size";
+ public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX = 16;
+ public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT = 10;
+ public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN = 2;
+
+ /**
+ * Defines the interval (in bytes) when a sync marker should be written into the binary file
+ */
+ public static final String PIG_INTERSTORAGE_SYNCMARKER_INTERVAL = "pig.interstorage.syncmarker.interval";
+ public static final long PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT = 2000;
+
+
/**
* Boolean value used to enable or disable fetching without a mapreduce job for DUMP. True by default
*/
Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Sat Jul 22 11:38:47 2017
@@ -42,16 +42,23 @@ import org.apache.pig.data.Tuple;
public class InterRecordReader extends RecordReader<Text, Tuple> {
private long start;
- private long pos;
+ private long lastDataPos;
private long end;
private BufferedPositionedInputStream in;
private Tuple value = null;
- public static final int RECORD_1 = 0x01;
- public static final int RECORD_2 = 0x02;
- public static final int RECORD_3 = 0x03;
private DataInputStream inData = null;
private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+ private byte[] syncMarker;
+ private long lastSyncPos = -1;
+ private long syncMarkerInterval;
+ private long dataBytesSeen = 0;
+
+ public InterRecordReader(int syncMarkerLength, long syncMarkerInterval) {
+ this.syncMarker = new byte[syncMarkerLength];
+ this.syncMarkerInterval = syncMarkerInterval;
+ }
+
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
@@ -60,63 +67,131 @@ public class InterRecordReader extends R
end = start + split.getLength();
final Path file = split.getPath();
- // open the file and seek to the start of the split
+ // open the file
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
- if (start != 0) {
- fileIn.seek(start);
+
+ // read the magic byte sequence serving as record marker but only if the file is not empty
+ if (!(start == 0 && end == 0)) {
+ fileIn.readFully(0, syncMarker, 0, syncMarker.length);
}
+
+ //seek to the start of the split
+ fileIn.seek(start);
+
in = new BufferedPositionedInputStream(fileIn, start);
inData = new DataInputStream(in);
}
-
- public boolean nextKeyValue() throws IOException {
+
+
+ /**
+ * Skips to next sync marker
+ * @return true if marker was observed, false if EOF or EndOfSplit was reached
+ * @throws IOException
+ */
+ private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
int b = 0;
- // skip to next record
- while (true) {
- if (in == null || in.getPosition() >=end) {
- return false;
- }
- // check if we saw RECORD_1 in our last attempt
- // this can happen if we have the following
- // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
- // After reading the second RECORD_1 in the above
- // sequence, we should not look for RECORD_1 again
- if(b != RECORD_1) {
+outer:while (b != -1) {
+ if (b != syncMarker[0]) {
+
+ //There may be a case where we read through a whole split without a marker, then we shouldn't proceed
+ // because the records are from the next split which another reader would pick up too
+ if (in.getPosition() >= end) {
+ return false;
+ }
b = in.read();
- if(b != RECORD_1 && b != -1) {
+ if ((byte) b != syncMarker[0] && b != -1) {
continue;
}
- if(b == -1) return false;
+ if (b == -1) return false;
}
- b = in.read();
- if(b != RECORD_2 && b != -1) {
- continue;
+ int i = 1;
+ while (i < syncMarker.length) {
+ b = in.read();
+ if (b == -1) return false;
+ if ((byte) b != syncMarker[i]) {
+ continue outer;
+ }
+ ++i;
}
- if(b == -1) return false;
+ lastSyncPos = in.getPosition();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Reads a sync marker
+ * @return true if sync marker was read, false if EOF reached
+ * @throws IOException thrown if neither EOF nor proper sync was found
+ */
+ private boolean readSyncFullyOrEOF() throws IOException {
+ int b = in.read();
+ if (b == -1) {
+ //EOF reached
+ return false;
+ }
+ if ((byte) b != syncMarker[0]) {
+ throw new IOException("Corrupt data file, expected sync marker at position " + in.getPosition());
+ }
+ int i = 1;
+ while (i < syncMarker.length) {
b = in.read();
- if(b != RECORD_3 && b != -1) {
- continue;
+ if ((byte) b != syncMarker[i]) {
+ throw new IOException("Corrupt data file, expected sync marker at position " + in.getPosition());
}
- if(b == -1) return false;
- b = in.read();
- if(!BinInterSedes.isTupleByte((byte) b) &&
- b != -1) {
- continue;
+ ++i;
+ }
+ lastSyncPos = in.getPosition();
+ return true;
+
+ }
+
+ private boolean readDataOrEOF() throws IOException {
+ long preDataPos = in.getPosition();
+ int b = in.read();
+ if(!BinInterSedes.isTupleByte((byte) b) ) {
+ if (b == -1) {
+ //EOF reached
+ return false;
+ } else {
+ throw new IOException("Corrupt data file, expected tuple type byte, but seen " + b);
}
- if(b == -1) return false;
- break;
}
try {
- // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
- // sequence - lets now read the contents of the tuple
value = (Tuple)sedes.readDatum(inData, (byte)b);
- pos=in.getPosition();
+ lastDataPos = in.getPosition();
+ dataBytesSeen += (lastDataPos-preDataPos);
return true;
} catch (ExecException ee) {
throw ee;
}
+ }
+
+ public boolean nextKeyValue() throws IOException {
+
+ //No marker has been seen, look for next marker
+ if (lastSyncPos == -1) {
+ if (!skipUntilMarkerOrSplitEndOrEOF()) {
+ return false;
+ }
+ }
+
+ //If we've read more or equal amount of data than the sync interval, we expect a sync marker or EOF
+ if (dataBytesSeen >= syncMarkerInterval) {
+ boolean isEOF = !readSyncFullyOrEOF();
+ if (isEOF) {
+ return false;
+ }
+ dataBytesSeen = 0;
+ //If we've just seen a (non-first) sync marker which was completely in the next split then we need to stop
+ if (in.getPosition()-syncMarker.length >= end) {
+ return false;
+ }
+ }
+ //Sync marker has been seen, expect data
+ return readDataOrEOF();
}
@Override
@@ -138,7 +213,7 @@ public class InterRecordReader extends R
if (start == end) {
return 0.0f;
} else {
- return Math.min(1.0f, (pos - start) / (float)(end - start));
+ return Math.min(1.0f, (lastDataPos - start) / (float)(end - start));
}
}
Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java Sat Jul 22 11:38:47 2017
@@ -17,12 +17,16 @@
*/
package org.apache.pig.impl.io;
-import java.io.DataOutputStream;
import java.io.IOException;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Time;
import org.apache.pig.data.InterSedes;
import org.apache.pig.data.InterSedesFactory;
import org.apache.pig.data.Tuple;
@@ -35,20 +39,34 @@ import org.apache.pig.data.Tuple;
public class InterRecordWriter extends
RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> {
- public static final int RECORD_1 = 0x01;
- public static final int RECORD_2 = 0x02;
- public static final int RECORD_3 = 0x03;
private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+
+ private byte[] syncMarker;
+ private long lastSyncPos = -1;
+ private long syncMarkerInterval;
/**
* the outputstream to write out on
*/
- private DataOutputStream out;
+ private FSDataOutputStream out;
/**
*
*/
- public InterRecordWriter(DataOutputStream out) {
+ public InterRecordWriter(FSDataOutputStream out, int syncMarkerLength, long syncMarkerInterval) {
this.out = out;
+ this.syncMarkerInterval = syncMarkerInterval;
+ syncMarker = new byte[syncMarkerLength];
+
+ try {
+ MessageDigest digester = MessageDigest.getInstance("MD5");
+ long time = Time.now();
+ digester.update((new UID()+"@"+time).getBytes());
+ byte[] generatedMarker = digester.digest();
+ System.arraycopy(generatedMarker, 0, syncMarker, 0, syncMarkerLength);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+
}
/* (non-Javadoc)
@@ -66,10 +84,11 @@ public class InterRecordWriter extends
@Override
public void write(WritableComparable wc, Tuple t) throws IOException,
InterruptedException {
- // we really only want to write the tuple (value) out here
- out.write(RECORD_1);
- out.write(RECORD_2);
- out.write(RECORD_3);
+ // we really only want to write the tuple (value) out here (and a sync syncMarker before that if necessary)
+ if (lastSyncPos == -1 || out.getPos() >= (lastSyncPos + syncMarkerInterval)) {
+ out.write(syncMarker);
+ lastSyncPos = out.getPos();
+ }
sedes.writeDatum(out, t);
}
Modified: pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Sat Jul 22 11:38:47 2017
@@ -39,6 +39,7 @@ import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
@@ -62,10 +63,10 @@ implements StoreFuncInterface, LoadMetad
private static final Log mLog = LogFactory.getLog(InterStorage.class);
public static final String useLog = "Pig Internal storage in use";
-
+
private InterRecordReader recReader = null;
private InterRecordWriter recWriter = null;
-
+
/**
* Simple binary nested reader format
*/
@@ -102,7 +103,9 @@ implements StoreFuncInterface, LoadMetad
public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
- return new InterRecordReader();
+ return new InterRecordReader(retrieveMarkerLengthFromConf(context.getConfiguration()),
+ retrieveMarkerIntervalFromConf(context.getConfiguration())
+ );
}
}
@@ -141,7 +144,10 @@ implements StoreFuncInterface, LoadMetad
Path file = getDefaultWorkFile(job, "");
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
- return new InterRecordWriter(fileOut);
+ return new InterRecordWriter(fileOut,
+ retrieveMarkerLengthFromConf(job.getConfiguration()),
+ retrieveMarkerIntervalFromConf(job.getConfiguration())
+ );
}
}
@@ -208,4 +214,22 @@ implements StoreFuncInterface, LoadMetad
public void cleanupOnSuccess(String location, Job job) throws IOException {
// DEFAULT: do nothing
}
+
+ private static int retrieveMarkerLengthFromConf(Configuration conf) {
+ int requestedLength = conf.getInt(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE,
+ PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT);
+
+ if (requestedLength > PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX) {
+ requestedLength = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX;
+ } else if (requestedLength < PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN) {
+ requestedLength = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN;
+ }
+
+ return requestedLength;
+ }
+
+ private static long retrieveMarkerIntervalFromConf(Configuration conf) {
+ return conf.getLong(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL,
+ PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
+ }
}
Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Sat Jul 22 11:38:47 2017
@@ -44,6 +44,7 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -506,30 +507,20 @@ public class TestSchemaTuple {
File temp = File.createTempFile("tmp", "tmp");
temp.deleteOnExit();
FileOutputStream fos = new FileOutputStream(temp);
- DataOutputStream dos = new DataOutputStream(fos);
+ FSDataOutputStream dos = new FSDataOutputStream(fos, null);
- InterRecordWriter writer = new InterRecordWriter(dos);
+ InterRecordWriter writer = new InterRecordWriter(dos,
+ PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT,
+ PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
- // We add these lines because a part of the InterStorage logic
- // is the ability to seek to the next Tuple based on a magic set
- // of bytes. This emulates the random byes that will be present
- // at the beginning of a split.
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
+ // This test does not cover the case of overlapping record bytes that may be present at the
+ // beginning of a split, for that see org.apache.pig.test.TestBinInterSedes#testInterStorageSyncMarker()
for (int i = 0; i < sz; i++) {
SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
fillWithData(st);
writer.write(null, st);
written.add(st);
-
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
- dos.writeByte(r.nextInt());
}
writer.close(null);
@@ -541,7 +532,8 @@ public class TestSchemaTuple {
InputSplit is = new FileSplit(new Path(temp.getAbsolutePath()), 0, temp.length(), null);
- InterRecordReader reader = new InterRecordReader();
+ InterRecordReader reader = new InterRecordReader(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT,
+ PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
reader.initialize(is, HadoopShims.createTaskAttemptContext(conf, taskId));
for (int i = 0; i < sz; i++) {
Modified: pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java Sat Jul 22 11:38:47 2017
@@ -24,11 +24,16 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.BinInterSedes;
import org.apache.pig.data.DataBag;
@@ -296,6 +301,142 @@ public class TestBinInterSedes {
}
}
+
+ /*
+ The following tests are intended to verify the reading and writing of intermediate files of Pig (of InterStorage)
+ The test records are 11,14,22,14 bytes long.
+ Below I illustrate the splits in rows, records as [] with size and sync markers with [M]
+ */
+
+ /**
+ * One sync marker only and three splits where the records overlap the splitends.
+ * (Reader of 1st split should read every record, readers of 2nd and 3rd splits should read no records.)
+ * [M(10)] [11] [11-
+ * -3] [ 22 ] [7-
+ * -7]
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSyncMarkerOneMarkerAtBeginningOnly() throws Exception {
+ testInterStorageSyncMarker(32, 10, 2000L);
+ }
+
+ /**
+ * Some sync markers are positioned so that they begin at a split's end and they end in the next split's beginning.
+ * (Reader of a split has to read until the next sync marker that has all its bytes in a following split.)
+ * @throws Exception
+ */
+ @Test
+ public void testSyncMarkerOverlappingMarker() throws Exception {
+ /*
+ * [M(16)] [11] [M(16)] [5-
+ * -9] [M(16)] [ 22 ] [M(1-
+ * -15)] [14]
+ */
+ testInterStorageSyncMarker(48, 16, 10L);
+ /*
+ * [M(4)] [ 4-
+ * -7] [1-
+ * - 8 -
+ * -5] [M(3-
+ * -1)] [ 7-
+ * - 8 -
+ * -7] [M(1-
+ * -3)] [ 5-
+ * - 8 -
+ * -1]
+ */
+ testInterStorageSyncMarker(8, 4, 20L);
+ }
+
+ /**
+ * No illustration for this one to save characters .. Sync size is over 3 times the size of split size, this is an
+ * extremely unlikely scenario. Markers here span over 4 splits.
+ * @throws Exception
+ */
+ @Test
+ public void testSyncMarkerLongerMarkerThanSplit() throws Exception {
+ testInterStorageSyncMarker(5, 16, 20L);
+ }
+
+ /**
+ * A sync marker is positioned at exactly the end of the first split without overlapping into the next one.
+ * (Reader of the 1st split should read past it and into the 2nd split until next marker.)
+ *
+ * [M(2)] [11] [14] [M(2)]
+ * [ 22 ] [M(2)] [ 5-
+ * -9]
+ * @throws Exception
+ */
+ @Test
+ public void testSyncMarkerMarkerOnSplitEnd() throws Exception {
+ testInterStorageSyncMarker(29, 2, 20L);
+ }
+
+ /**
+ * A sync marker is positioned at exactly the beginning of the 3rd split.
+ * (Reader of the 1st split should read 1st and 2nd splits fully, reader of 2nd split should read no records.)
+ *
+ * [M(3)] [11]
+ * [ 14 ]
+ * [M(3) [11-
+ * -11 ] [3-
+ * -11 ]
+ * @throws Exception
+ */
+ @Test
+ public void testSyncMarkerMarkerOnSplitBeginning() throws Exception {
+ testInterStorageSyncMarker(14, 3, 25L);
+ }
+
+ private void testInterStorageSyncMarker(int maxSplitSize, int syncSize, long syncInterval) throws Exception {
+ PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
+
+ Properties pigProperties = pigServer.getPigContext().getProperties();
+ pigProperties.setProperty("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(maxSplitSize));
+ pigProperties.setProperty(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE, String.valueOf(syncSize));
+ pigProperties.setProperty(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL, String.valueOf(syncInterval));
+
+ //Without proper random record markers 0x01020327 would be identified as a marker and 0x50 as an unknown datatype
+ //ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03, 0x27, 0x50, 0x0, 0x0, 0x0}).getLong() => 72624011372134400
+
+ String[] inputData = new String[]{"apple\t1\t1","orange\t2\t2","kiwi\t16909095\t72624011372134400","orange\t4\t4"};
+ String[] expected = new String[] {"(apple,1,1)","(orange,2,2)","(kiwi,16909095,72624011372134400)","(orange,4,4)"};
+ File inputFile = Util.createInputFile("interStorageInput", "", inputData);
+ inputFile.deleteOnExit();
+
+ //Without proper random record markers 0x01020327 would be identified as a marker and although no errors are
+ // thrown the result will contain incorrect schema and values past this number
+ //ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03, 0x27, 0x01, 0x0, 0x0, 0x0}).getLong() => 72624010046734336
+
+ String[] inputData2 = new String[]{"apple\t1\t1","orange\t2\t2","kiwi\t16909095\t72624010046734336","orange\t4\t4"};
+ String[] expected2 = new String[] {"(apple,1,1)","(orange,2,2)","(kiwi,16909095,72624010046734336)","(orange,4,4)"};
+ File inputFile2 = Util.createInputFile("interStorageInput2", "", inputData2);
+ inputFile2.deleteOnExit();
+
+ File binOutputdir = new File("build/test/interStorageTest");
+ Util.deleteDirectory(binOutputdir);
+
+ String script = "A = LOAD '"+inputFile.getAbsolutePath()+"' AS (name:chararray, cnt:int, cnt2:long);\n" +
+ "STORE A INTO '"+binOutputdir.getAbsolutePath()+"' USING org.apache.pig.impl.io.InterStorage();\n" +
+ "\n" +
+ "B = LOAD '"+binOutputdir.getAbsolutePath()+"' USING org.apache.pig.impl.io.InterStorage();\n";
+
+ pigServer.registerQuery(script);
+ Iterator<Tuple> it = pigServer.openIterator("B");
+ Util.checkQueryOutputsAfterSortRecursive(it, expected,
+ org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("B")));
+
+ Util.deleteDirectory(binOutputdir);
+
+ pigServer.registerQuery(script.replaceAll(inputFile.getAbsolutePath(), inputFile2.getAbsolutePath()));
+ it = pigServer.openIterator("B");
+ Util.checkQueryOutputsAfterSortRecursive(it, expected2,
+ org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("B")));
+
+ }
+
private void testSerTuple(Tuple t, byte[] expected) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(baos);
Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Sat Jul 22 11:38:47 2017
@@ -414,12 +414,18 @@ public class TestFRJoin2 {
pigServer.registerQuery("C = foreach C generate MAX(B.x) as x;");
pigServer.registerQuery("D = join A by x, B by x, C by x using 'repl';");
{
- // When the replicated input sizes=(12 + 5) is bigger than
- // pig.join.replicated.max.bytes=16, we throw exception
+ // When the replicated input size is bigger than
+ // pig.join.replicated.max.bytes, we throw exception
+ // Expected replicated size below:
+ // Alias B: sync marker + 2 records (1 tuple type byte + 2 integers (0 or 1))
+ // Alias C: sync marker + 1 record (1 tuple type byte + 1 integer (1))
+ long expectedReplicateSize = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT + 2*(1 +1+1)
+ + PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT + 1*(1 +1);
+
try {
pigServer.getPigContext().getProperties().setProperty(
PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES,
- String.valueOf(16));
+ String.valueOf(expectedReplicateSize-1));
pigServer.openIterator("D");
Assert.fail();
} catch (FrontendException e) {
@@ -428,10 +434,10 @@ public class TestFRJoin2 {
e.getCause().getCause().getCause().getMessage());
}
- // If we increase the size to 17, it should work
+ // If we increase the max size setting to the expected amount it works
pigServer.getPigContext().getProperties().setProperty(
PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES,
- String.valueOf(17));
+ String.valueOf(expectedReplicateSize));
pigServer.openIterator("D");
}
}