You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/01/24 23:02:20 UTC
svn commit: r615045 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/PigStorage.java
src/org/apache/pig/builtin/TextLoader.java
src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
test/org/apache/pig/test/TestBuiltin.java
Author: olga
Date: Thu Jan 24 14:02:19 2008
New Revision: 615045
URL: http://svn.apache.org/viewvc?rev=615045&view=rev
Log:
Fix for PIG-63
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Jan 24 14:02:19 2008
@@ -66,3 +66,5 @@
comparator function instead of Class.forName. (gates)
PIG-56: Made DataBag implement Iterable. (groves via gates)
+
+ PIG-63: Fix for non-ascii UTF-8 data (breed@ and olgan@)
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Jan 24 14:02:19 2008
@@ -17,18 +17,12 @@
*/
package org.apache.pig.builtin;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.Iterator;
+import java.nio.charset.Charset;
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
-import org.apache.pig.data.TimestampedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -40,11 +34,10 @@
*/
public class PigStorage implements LoadFunc, StoreFunc {
protected BufferedPositionedInputStream in = null;
- private DataInputStream inData = null;
-
long end = Long.MAX_VALUE;
- private String recordDel = "\n";
+ private byte recordDel = (byte)'\n';
private String fieldDel = "\t";
+ final private static Charset utf8 = Charset.forName("UTF8");
public PigStorage() {
}
@@ -66,7 +59,7 @@
return null;
}
String line;
- if((line = inData.readLine()) != null) {
+ if((line = in.readLine(utf8, recordDel)) != null) {
return new Tuple(line, fieldDel);
}
return null;
@@ -74,11 +67,10 @@
public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
this.in = in;
- inData = new DataInputStream(in);
this.end = end;
// Since we are not block aligned we throw away the first
- // record and cound on a different instance to read it
+ // record and could on a different instance to read it
if (offset != 0) {
getNext();
}
@@ -90,7 +82,7 @@
}
public void putNext(Tuple f) throws IOException {
- os.write((f.toDelimitedString(this.fieldDel) + this.recordDel).getBytes());
+ os.write((f.toDelimitedString(this.fieldDel) + (char)this.recordDel).getBytes(utf8));
}
public void finish() throws IOException {
Modified: incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java Thu Jan 24 14:02:19 2008
@@ -17,8 +17,8 @@
*/
package org.apache.pig.builtin;
-import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import org.apache.pig.LoadFunc;
import org.apache.pig.data.DataAtom;
@@ -32,16 +32,14 @@
*/
public class TextLoader implements LoadFunc{
BufferedPositionedInputStream in;
- private DataInputStream inData = null;
-
+ final private static Charset utf8 = Charset.forName("UTF8");
long end;
public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
this.in = in;
- inData = new DataInputStream(in);
this.end = end;
// Since we are not block aligned we throw away the first
- // record and cound on a different instance to read it
+ // record and could on a different instance to read it
if (offset != 0)
getNext();
}
@@ -50,7 +48,7 @@
if (in == null || in.getPosition() > end)
return null;
String line;
- if ((line = inData.readLine()) != null) {
+ if ((line = in.readLine(utf8, (byte)'\n')) != null) {
Tuple t = new Tuple(1);
t.setField(0, new DataAtom(line));
return t;
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Thu Jan 24 14:02:19 2008
@@ -20,6 +20,12 @@
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
import org.apache.tools.bzip2r.CBZip2InputStream;
@@ -66,5 +72,92 @@
return pos;
}
+ /*
+ * Preallocated array for readline buffering
+ */
+ private byte barray[] = new byte[1024];
+
+ /*
+ * Preallocated ByteBuffer for readline buffering
+ */
+ private ByteBuffer bbuff = ByteBuffer.wrap(barray);
+
+ /*
+ * Preallocated char array for decoding bytes
+ */
+ private char carray[] = new char[1024];
+
+ /*
+ * Preallocated CharBuffer for decoding bytes
+ */
+ private CharBuffer cbuff = CharBuffer.wrap(carray);
+ public String readLine(Charset charset, byte delimiter) throws IOException {
+ CharsetDecoder decoder = charset.newDecoder();
+ decoder.onMalformedInput(CodingErrorAction.REPLACE);
+ decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+ int delim = delimiter&0xff;
+ int rc;
+ int offset = 0;
+ StringBuilder sb = null;
+ CoderResult res;
+ while ((rc = read())!=-1) {
+ if (rc == delim) {
+ break;
+ }
+ barray[offset++] = (byte)rc;
+ if (barray.length == offset) {
+ bbuff.position(0);
+ bbuff.limit(barray.length);
+ cbuff.position(0);
+ cbuff.limit(carray.length);
+ res = decoder.decode(bbuff, cbuff, false);
+ if (res.isError()) {
+ throw new IOException("Decoding error: " + res.toString());
+ }
+ offset = bbuff.remaining();
+ switch (offset) {
+ default:
+ System.arraycopy(barray, bbuff.position(), barray, 0, bbuff
+ .remaining());
+ break;
+ case 2:
+ barray[1] = barray[barray.length - 1];
+ barray[0] = barray[barray.length - 2];
+ break;
+ case 1:
+ barray[0] = barray[barray.length - 1];
+ break;
+ case 0:
+ }
+ if (sb == null) {
+ sb = new StringBuilder(cbuff.position());
+ }
+ sb.append(carray, 0, cbuff.position());
+ }
+ }
+ if (sb == null) {
+ if (rc == -1 && offset == 0) {
+ // We are at EOF with nothing read
+ return null;
+ }
+ sb = new StringBuilder();
+ }
+ bbuff.position(0);
+ bbuff.limit(offset);
+ cbuff.position(0);
+ cbuff.limit(carray.length);
+ res = decoder.decode(bbuff, cbuff, true);
+ if (res.isError()) {
+ System.out.println("Error");
+ }
+ sb.append(carray, 0, cbuff.position());
+ cbuff.position(0);
+ res = decoder.flush(cbuff);
+ if (res.isError()) {
+ System.out.println("Error");
+ }
+ sb.append(carray, 0, cbuff.position());
+ return sb.toString();
+ }
}
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Thu Jan 24 14:02:19 2008
@@ -18,9 +18,6 @@
package org.apache.pig.test;
import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Iterator;
@@ -39,11 +36,8 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataMap;
import org.apache.pig.data.Tuple;
-import org.apache.pig.PigServer.ExecType;
import org.apache.pig.impl.builtin.ShellBagEvalFunc;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.PigContext;
public class TestBuiltin extends TestCase {
@@ -312,6 +306,28 @@
p1.bindTo(null, new BufferedPositionedInputStream(ffis1), 0, input1.getBytes().length);
Tuple f1 = p1.getNext();
assertTrue(f1.arity() == arity1);
+
+ LoadFunc p15 = new PigStorage();
+ StringBuilder sb = new StringBuilder();
+ int LOOP_COUNT = 1024;
+ for (int i = 0; i < LOOP_COUNT; i++) {
+ for (int j = 0; j < LOOP_COUNT; j++) {
+ sb.append(i + "\t" + i + "\t" + j % 2 + "\n");
+ }
+ }
+ FakeFSInputStream ffis15 = new FakeFSInputStream(sb.toString()
+ .getBytes());
+ p15.bindTo(null, new BufferedPositionedInputStream(ffis15), 0, input1
+ .getBytes().length);
+ int count = 0;
+ while (true) {
+ Tuple f15 = p15.getNext();
+ if (f15 == null)
+ break;
+ count++;
+ assertEquals(3, f15.arity());
+ }
+ assertEquals(LOOP_COUNT * LOOP_COUNT, count);
String input2 = ":this:has:a:leading:colon\n";
int arity2 = 6;