You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by tz...@apache.org on 2015/02/11 09:28:15 UTC

crunch git commit: CRUNCH-491: Use the CharsetEncoder to compute the raw byte size of a char

Repository: crunch
Updated Branches:
  refs/heads/master 050b4a9e1 -> 5e6e33536


CRUNCH-491: Use the CharsetEncoder to compute the raw byte size of a char

Signed-off-by: tzolov <ch...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5e6e3353
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5e6e3353
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5e6e3353

Branch: refs/heads/master
Commit: 5e6e335363d86630a14067b37bd8efc6e5bc1607
Parents: 050b4a9
Author: tzolov <ch...@gmail.com>
Authored: Thu Feb 5 16:32:58 2015 +0100
Committer: tzolov <ch...@gmail.com>
Committed: Wed Feb 11 09:16:54 2015 +0100

----------------------------------------------------------------------
 .../crunch/io/text/xml/XmlInputFormat.java      |  24 +++-
 .../apache/crunch/io/text/xml/XmlSource.java    |   2 +-
 .../crunch/io/text/xml/XmlRecordReaderTest.java | 117 +++++++++++++++++++
 .../src/main/resources/xmlSourceSample3.xml     |  27 +++++
 4 files changed, 165 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java
index 58157fe..79f867c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.UnsupportedEncodingException;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -40,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
-import com.google.common.primitives.Chars;
 
 /**
  * Reads records that are delimited by a specific begin/end tag.
@@ -67,7 +69,7 @@ public class XmlInputFormat extends TextInputFormat {
 
   /**
    * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified by the
-   * start tag and end tag
+   * start tag and end tag.
    */
   public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
 
@@ -84,7 +86,9 @@ public class XmlInputFormat extends TextInputFormat {
     private final BufferedReader inReader;
     private final OutputStreamWriter outWriter;
     private final String inputEncoding;
-    private int readByteCounter = 0;
+    private long readByteCounter;
+
+    private CharsetEncoder charsetEncoder;
 
     public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
       inputEncoding = conf.get(ENCODING, DEFAULT_ENCODING);
@@ -98,9 +102,12 @@ public class XmlInputFormat extends TextInputFormat {
       FileSystem fs = file.getFileSystem(conf);
       FSDataInputStream fsin = fs.open(split.getPath());
       fsin.seek(start);
+      readByteCounter =  start;
       inReader = new BufferedReader(new InputStreamReader(fsin, Charset.forName(inputEncoding)));
       outBuffer = new DataOutputBuffer();
       outWriter = new OutputStreamWriter(outBuffer, inputEncoding);
+      
+      charsetEncoder = Charset.forName(inputEncoding).newEncoder();
     }
 
     private boolean next(LongWritable key, Text value) throws IOException {
@@ -142,7 +149,7 @@ public class XmlInputFormat extends TextInputFormat {
       while (true) {
         int nextInCharacter = inReader.read();
 
-        readByteCounter = +Chars.toByteArray((char) nextInCharacter).length;
+        readByteCounter = readByteCounter + calculateCharacterByteLength((char) nextInCharacter);
 
         // end of file:
         if (nextInCharacter == -1) {
@@ -189,5 +196,14 @@ public class XmlInputFormat extends TextInputFormat {
       currentValue = new Text();
       return next(currentKey, currentValue);
     }
+    
+    private int calculateCharacterByteLength(final char character) {
+      try {
+        return charsetEncoder.encode(CharBuffer.wrap(new char[] { character })).limit();
+      } catch (final CharacterCodingException e) {
+        throw new RuntimeException("The character attempting to be read (" + character + ") could not be encoded with "
+            + inputEncoding);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java
index 2e434e7..c6ebb5e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java
@@ -26,7 +26,7 @@ import com.google.common.base.Charsets;
 
 /**
  * Large XML documents composed of repetitive XML elements can be broken into chunks delimited by element's start and
- * end tag. The {@link XmlSource2} process XML files and extract out the XML between the pre-configured start / end
+ * end tag. The {@link XmlSource} process XML files and extract out the XML between the pre-configured start / end
  * tags. Developer should process the content between the tags.
  * 
  * The {@link XmlSource} does not parse the input XML files and is not aware of the XML semantics. It just splits the

http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java
new file mode 100644
index 0000000..9876b46
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.xml;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.text.xml.XmlInputFormat.XmlRecordReader;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * {@link XmlRecordReader} Test.
+ *
+ */
+public class XmlRecordReaderTest {
+
+  @Rule
+  public transient TemporaryPath tmpDir = new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir");
+
+  private Configuration conf;
+
+  private String xmlFile;
+
+  private long xmlFileLength;
+
+  @Before
+  public void before() throws IOException {
+    xmlFile = tmpDir.copyResourceFileName("xmlSourceSample3.xml");
+    xmlFileLength = getFileLength(xmlFile);
+
+    conf = new org.apache.hadoop.conf.Configuration();
+    conf.set(XmlInputFormat.START_TAG_KEY, "<PLANT");
+    conf.set(XmlInputFormat.END_TAG_KEY, "</PLANT>");
+  }
+
+  @Test
+  public void testStartOffsets() throws Exception {
+    /*
+     * The xmlSourceSample3.xml file byte ranges:
+     *
+     * 50-252 - first PLANT element.
+     *
+     * 254-454 - second PLANT element.
+     *
+     * 456-658 - third PLANT element.
+     */
+    assertEquals("Starting from offset 0 should read all elements", 3, readXmlElements(createSplit(0, xmlFileLength)));
+    assertEquals("Offset is in the middle of the first element. Should read only the remaining 2 elements", 2,
+        readXmlElements(createSplit(100, xmlFileLength)));
+    assertEquals("Offset is in the middle of the second element. Should read only the remaining 1 element", 1,
+        readXmlElements(createSplit(300, xmlFileLength)));
+    assertEquals("Offset is in the middle of the third element. Should read no elements", 0,
+        readXmlElements(createSplit(500, xmlFileLength)));
+  }
+
+  @Test
+  public void readThroughSplitEnd() throws IOException, InterruptedException {
+    // Third element starts at position: 456 and has length: 202
+    assertEquals("Split starts before the 3rd element and ends in the middle of the 3rd element.", 1,
+        readXmlElements(createSplit(300, ((456 - 300) + 202 / 2))));
+    assertEquals("Split starts and ends before the 3rd element.", 0, readXmlElements(createSplit(300, (456 - 300))));
+  }
+
+  private FileSplit createSplit(long offset, long length) {
+    return new FileSplit(new Path(xmlFile), offset, length, new String[] {});
+  }
+
+  private long readXmlElements(FileSplit split) throws IOException, InterruptedException {
+
+    int elementCount = 0;
+
+    XmlRecordReader xmlRecordReader = new XmlRecordReader(split, conf);
+    try {
+      long lastKey = 0;
+      while (xmlRecordReader.nextKeyValue()) {
+        elementCount++;
+        assertTrue(xmlRecordReader.getCurrentKey().get() > lastKey);
+        lastKey = xmlRecordReader.getCurrentKey().get();
+        assertTrue(xmlRecordReader.getCurrentValue().getLength() > 0);
+      }
+    } finally {
+      xmlRecordReader.close();
+    }
+
+    return elementCount;
+  }
+
+  private long getFileLength(String fileName) throws FileNotFoundException, IOException {
+    return new File(fileName).length();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-test/src/main/resources/xmlSourceSample3.xml
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/xmlSourceSample3.xml b/crunch-test/src/main/resources/xmlSourceSample3.xml
new file mode 100644
index 0000000..9db87fe
--- /dev/null
+++ b/crunch-test/src/main/resources/xmlSourceSample3.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<CATALOG>
+	<PLANT>
+		<COMMON>Bloodroot</COMMON>
+		<BOTANICAL>Sanguinaria canadensis</BOTANICAL>
+		<ZONE>4</ZONE>
+		<LIGHT>Mostly Shady</LIGHT>
+		<PRICE>$2.44</PRICE>
+		<AVAILABILITY>031599</AVAILABILITY>
+	</PLANT>
+	<PLANT>
+		<COMMON>Columbine</COMMON>
+		<BOTANICAL>Aquilegia canadensis</BOTANICAL>
+		<ZONE>3</ZONE>
+		<LIGHT>Mostly Shady</LIGHT>
+		<PRICE>$9.37</PRICE>
+		<AVAILABILITY>030699</AVAILABILITY>
+	</PLANT>
+	<PLANT>
+		<COMMON>Marsh Marigold</COMMON>
+		<BOTANICAL>Caltha palustris</BOTANICAL>
+		<ZONE>4</ZONE>
+		<LIGHT>Mostly Sunny</LIGHT>
+		<PRICE>$6.81</PRICE>
+		<AVAILABILITY>051799</AVAILABILITY>
+	</PLANT>
+</CATALOG>