You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/02/15 00:20:02 UTC

git commit: Fix UTF8 encoded outputs in MemPipeline.write

Repository: crunch
Updated Branches:
  refs/heads/master 1a160b653 -> ac3863e80


Fix UTF8 encoded outputs in MemPipeline.write


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac3863e8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac3863e8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac3863e8

Branch: refs/heads/master
Commit: ac3863e8019c33adae931d5d02e38e563e0b9c94
Parents: 1a160b6
Author: Josh Wills <jw...@apache.org>
Authored: Thu Feb 13 17:32:40 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Feb 14 15:05:29 2014 -0800

----------------------------------------------------------------------
 .../crunch/impl/mem/MemPipelineUTF8IT.java      | 89 ++++++++++++++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java | 14 +--
 pom.xml                                         | 10 ++-
 3 files changed, 106 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
new file mode 100644
index 0000000..56b167a
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import junit.framework.Assert;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.TextFileTarget;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class MemPipelineUTF8IT {
+
+  @Rule
+  public TemporaryPath baseTmpDir = TemporaryPaths.create();
+
+  private static void writeFile(String text, String filename) throws IOException {
+    Files.write(text, new File(filename), Charsets.UTF_8);
+  }
+
+  @Test
+  public void testText() throws Exception {
+
+    final String infilename = baseTmpDir.getFileName("input");
+    final String memOutFilename = baseTmpDir.getFileName("memPipelineOut");
+    final String mrOutFilename = baseTmpDir.getFileName("mrPipelineOut");
+    final String expected = "súper";
+
+    new File(infilename).getParentFile().mkdirs();
+
+    writeFile(expected, infilename);
+
+    Pipeline memPipeline = MemPipeline.getInstance();
+    PCollection<String> memPColl = memPipeline.readTextFile(infilename);
+    Target memTarget = new TextFileTarget(memOutFilename);
+    memPipeline.write(memPColl, memTarget, WriteMode.OVERWRITE);
+    memPipeline.run();
+    File outDir = new File(memOutFilename);
+    File actualMemOut = null;
+    for (File f : outDir.listFiles()) {
+      String name = f.getName();
+      if (name.contains("out") && name.endsWith(".txt")) {
+        actualMemOut = f;
+        break;
+      }
+    }
+    String actualMemText = Files.readFirstLine(actualMemOut, Charsets.UTF_8);
+
+    Pipeline mrPipeline = new MRPipeline(getClass());
+    PCollection<String> mrPColl = mrPipeline.readTextFile(infilename);
+    Target mrTarget = new TextFileTarget(mrOutFilename);
+    mrPipeline.write(mrPColl, mrTarget, WriteMode.OVERWRITE);
+    mrPipeline.run();
+    String actualMrText = Files.readFirstLine(new File(mrOutFilename + "/part-m-00000"), Charsets.UTF_8);
+
+    Assert.assertEquals("MR file mismatch", expected, actualMrText);
+    Assert.assertEquals("Mem file mismatch", expected, actualMemText);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 7ef9f4f..b3e9c54 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Charsets;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.logging.Log;
@@ -211,17 +212,20 @@ public class MemPipeline implements Pipeline {
             LOG.warn("Defaulting to write to a text file from MemPipeline");
             Path outputPath = new Path(path, "out" + outputIndex + ".txt");
             FSDataOutputStream os = fs.create(outputPath);
+            byte[] newLine = "\r\n".getBytes(Charsets.UTF_8);
             if (collection instanceof PTable) {
+              byte[] tab = "\t".getBytes(Charsets.UTF_8);
               for (Object o : collection.materialize()) {
                 Pair p = (Pair) o;
-                os.writeBytes(p.first().toString());
-                os.writeBytes("\t");
-                os.writeBytes(p.second().toString());
-                os.writeBytes("\r\n");
+                os.write(p.first().toString().getBytes(Charsets.UTF_8));
+                os.write(tab);
+                os.write(p.second().toString().getBytes(Charsets.UTF_8));
+                os.write(newLine);
               }
             } else {
               for (Object o : collection.materialize()) {
-                os.writeBytes(o.toString() + "\r\n");
+                os.write(o.toString().getBytes(Charsets.UTF_8));
+                os.write(newLine);
               }
             }
             os.close();

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 77def0c..8812f5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -643,7 +643,10 @@ under the License.
           <artifactId>maven-surefire-plugin</artifactId>
           <version>2.12</version>
           <configuration>
-            <argLine>-Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G</argLine>
+            <encoding>UTF-8</encoding>
+            <inputEncoding>UTF-8</inputEncoding>
+            <outputEncoding>UTF-8</outputEncoding>
+            <argLine>-Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G -Dfile.encoding=UTF-8</argLine>
           </configuration>
         </plugin>
         <plugin>
@@ -827,7 +830,10 @@ under the License.
         <artifactId>maven-failsafe-plugin</artifactId>
         <version>2.12</version>
         <configuration>
-          <argLine>-Xmx1G</argLine>
+          <encoding>UTF-8</encoding>
+          <inputEncoding>UTF-8</inputEncoding>
+          <outputEncoding>UTF-8</outputEncoding>
+          <argLine>-Xmx1G -Dfile.encoding=UTF-8</argLine>
           <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory>
         </configuration>
         <executions>