You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/02/15 00:20:02 UTC
git commit: Fix UTF8 encoded outputs in MemPipeline.write
Repository: crunch
Updated Branches:
refs/heads/master 1a160b653 -> ac3863e80
Fix UTF8 encoded outputs in MemPipeline.write
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac3863e8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac3863e8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac3863e8
Branch: refs/heads/master
Commit: ac3863e8019c33adae931d5d02e38e563e0b9c94
Parents: 1a160b6
Author: Josh Wills <jw...@apache.org>
Authored: Thu Feb 13 17:32:40 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Feb 14 15:05:29 2014 -0800
----------------------------------------------------------------------
.../crunch/impl/mem/MemPipelineUTF8IT.java | 89 ++++++++++++++++++++
.../org/apache/crunch/impl/mem/MemPipeline.java | 14 +--
pom.xml | 10 ++-
3 files changed, 106 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
new file mode 100644
index 0000000..56b167a
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import junit.framework.Assert;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.TextFileTarget;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class MemPipelineUTF8IT {
+
+ @Rule
+ public TemporaryPath baseTmpDir = TemporaryPaths.create();
+
+ private static void writeFile(String text, String filename) throws IOException {
+ Files.write(text, new File(filename), Charsets.UTF_8);
+ }
+
+ @Test
+ public void testText() throws Exception {
+
+ final String infilename = baseTmpDir.getFileName("input");
+ final String memOutFilename = baseTmpDir.getFileName("memPipelineOut");
+ final String mrOutFilename = baseTmpDir.getFileName("mrPipelineOut");
+ final String expected = "súper";
+
+ new File(infilename).getParentFile().mkdirs();
+
+ writeFile(expected, infilename);
+
+ Pipeline memPipeline = MemPipeline.getInstance();
+ PCollection<String> memPColl = memPipeline.readTextFile(infilename);
+ Target memTarget = new TextFileTarget(memOutFilename);
+ memPipeline.write(memPColl, memTarget, WriteMode.OVERWRITE);
+ memPipeline.run();
+ File outDir = new File(memOutFilename);
+ File actualMemOut = null;
+ for (File f : outDir.listFiles()) {
+ String name = f.getName();
+ if (name.contains("out") && name.endsWith(".txt")) {
+ actualMemOut = f;
+ break;
+ }
+ }
+ String actualMemText = Files.readFirstLine(actualMemOut, Charsets.UTF_8);
+
+ Pipeline mrPipeline = new MRPipeline(getClass());
+ PCollection<String> mrPColl = mrPipeline.readTextFile(infilename);
+ Target mrTarget = new TextFileTarget(mrOutFilename);
+ mrPipeline.write(mrPColl, mrTarget, WriteMode.OVERWRITE);
+ mrPipeline.run();
+ String actualMrText = Files.readFirstLine(new File(mrOutFilename + "/part-m-00000"), Charsets.UTF_8);
+
+ Assert.assertEquals("MR file mismatch", expected, actualMrText);
+ Assert.assertEquals("Mem file mismatch", expected, actualMemText);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 7ef9f4f..b3e9c54 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Charsets;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.logging.Log;
@@ -211,17 +212,20 @@ public class MemPipeline implements Pipeline {
LOG.warn("Defaulting to write to a text file from MemPipeline");
Path outputPath = new Path(path, "out" + outputIndex + ".txt");
FSDataOutputStream os = fs.create(outputPath);
+ byte[] newLine = "\r\n".getBytes(Charsets.UTF_8);
if (collection instanceof PTable) {
+ byte[] tab = "\t".getBytes(Charsets.UTF_8);
for (Object o : collection.materialize()) {
Pair p = (Pair) o;
- os.writeBytes(p.first().toString());
- os.writeBytes("\t");
- os.writeBytes(p.second().toString());
- os.writeBytes("\r\n");
+ os.write(p.first().toString().getBytes(Charsets.UTF_8));
+ os.write(tab);
+ os.write(p.second().toString().getBytes(Charsets.UTF_8));
+ os.write(newLine);
}
} else {
for (Object o : collection.materialize()) {
- os.writeBytes(o.toString() + "\r\n");
+ os.write(o.toString().getBytes(Charsets.UTF_8));
+ os.write(newLine);
}
}
os.close();
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 77def0c..8812f5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -643,7 +643,10 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12</version>
<configuration>
- <argLine>-Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G</argLine>
+ <encoding>UTF-8</encoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ <outputEncoding>UTF-8</outputEncoding>
+ <argLine>-Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G -Dfile.encoding=UTF-8</argLine>
</configuration>
</plugin>
<plugin>
@@ -827,7 +830,10 @@ under the License.
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.12</version>
<configuration>
- <argLine>-Xmx1G</argLine>
+ <encoding>UTF-8</encoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ <outputEncoding>UTF-8</outputEncoding>
+ <argLine>-Xmx1G -Dfile.encoding=UTF-8</argLine>
<testSourceDirectory>${basedir}/src/it/java</testSourceDirectory>
</configuration>
<executions>