You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/01/03 03:53:13 UTC
git commit: CRUNCH-316: Integration test for SafeAvroSerialization
and ArrayIndexOutOfBoundsException
Updated Branches:
refs/heads/master b4cad9454 -> 64c20ad9c
CRUNCH-316: Integration test for SafeAvroSerialization and ArrayIndexOutOfBoundsException
Signed-off-by: Micah Whitacre <mk...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/64c20ad9
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/64c20ad9
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/64c20ad9
Branch: refs/heads/master
Commit: 64c20ad9c2361786c225d874561511ec62dd408d
Parents: b4cad94
Author: Ben Roling <be...@cerner.com>
Authored: Thu Jan 2 16:55:53 2014 -0600
Committer: Micah Whitacre <mk...@apache.org>
Committed: Thu Jan 2 20:30:47 2014 -0600
----------------------------------------------------------------------
.../types/avro/SafeAvroSerializationIT.java | 159 +++++++++++++++++++
crunch-core/src/it/resources/CRUNCH-316.avsc | 39 +++++
2 files changed, 198 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/64c20ad9/crunch-core/src/it/java/org/apache/crunch/types/avro/SafeAvroSerializationIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/types/avro/SafeAvroSerializationIT.java b/crunch-core/src/it/java/org/apache/crunch/types/avro/SafeAvroSerializationIT.java
new file mode 100644
index 0000000..1bbade9
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/types/avro/SafeAvroSerializationIT.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class SafeAvroSerializationIT implements Serializable {
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ /**
+ * Test to prove CRUNCH-316 has been fixed
+ */
+ @Test
+ public void testMapBufferTooSmallException() throws IOException {
+ Configuration configuration = tmpDir.getDefaultConfiguration();
+
+ // small io.sort.mb to make the test run faster with less resources
+ configuration.set("io.sort.mb", "1");
+
+ Pipeline pipeline = new MRPipeline(SafeAvroSerializationIT.class,
+ configuration);
+
+ Schema schema = new Schema.Parser().parse(tmpDir
+ .copyResourceFile("CRUNCH-316.avsc"));
+
+ PTable<String, GenericData.Record> leftSide = pipeline.read(
+ At.avroFile(
+ new Path(populateLeftSide(schema).getAbsolutePath()),
+ Avros.generics(schema))).by(
+ new MapFn<GenericData.Record, String>() {
+ @Override
+ public String map(GenericData.Record input) {
+ return (String) input.get("tag").toString();
+ }
+ }, Avros.strings());
+
+ PTable<String, String> rightSide = pipeline.read(
+ At.avroFile(new Path(populateRightSide().getAbsolutePath()),
+ Avros.strings())).by(new MapFn<String, String>() {
+ @Override
+ public String map(String input) {
+ return input;
+ }
+ }, Avros.strings());
+
+ PTable<String, org.apache.crunch.Pair<GenericData.Record, String>> joinedTable = leftSide
+ .join(rightSide);
+
+ // if CRUNCH-316 isn't fixed, this will result in an
+ // ArrayIndexOutOfBoundsException in the reduce
+ Collection<Pair<String, Pair<Record, String>>> joinRows = joinedTable
+ .asCollection().getValue();
+
+ assertEquals(1, joinRows.size());
+ Pair<String, Pair<Record, String>> firstRow = joinRows.iterator()
+ .next();
+ assertEquals("c", firstRow.first());
+ assertEquals("c", firstRow.second().first().get("tag").toString());
+ assertEquals(createString('c', 40),
+ firstRow.second().first().get("data1").toString());
+ assertEquals(null, firstRow.second().first().get("data2"));
+ assertEquals("c", firstRow.second().second());
+ }
+
+ private File populateLeftSide(Schema schema) throws IOException {
+ File file = tmpDir.getFile("leftSide.avro");
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(
+ schema);
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
+ datumWriter);
+ dataFileWriter.create(schema, file);
+
+ GenericRecord record = new GenericData.Record(schema);
+
+ // RECORD 1
+ record.put("tag", "b");
+ record.put("data1", createString('b', 996100));
+
+ // buffer space has to run out on a write of less than 512 bytes for the
+ // issue to occur
+ record.put("data2", createString('b', 250));
+
+ dataFileWriter.append(record);
+
+ // RECORD 2 -- this record will be corrupted with overflow from RECORD 1
+ record.put("tag", "c");
+ record.put("data1", createString('c', 40));
+ record.put("data2", null);
+ dataFileWriter.append(record);
+
+ dataFileWriter.close();
+ return file;
+ }
+
+ private File populateRightSide() throws IOException {
+ File file = tmpDir.getFile("rightSide.avro");
+ DatumWriter<String> datumWriter = new GenericDatumWriter<String>(Avros
+ .strings().getSchema());
+ DataFileWriter<String> dataFileWriter = new DataFileWriter<String>(
+ datumWriter);
+ dataFileWriter.create(Avros.strings().getSchema(), file);
+
+ // will join successfully to RECORD 2 from left side
+ dataFileWriter.append("c");
+
+ dataFileWriter.close();
+ return file;
+ }
+
+ private static String createString(Character ch, int len) {
+ StringBuilder buffer = new StringBuilder(len);
+ for (int i = 0; i < len; i++) {
+ buffer.append(ch);
+ }
+ return buffer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/64c20ad9/crunch-core/src/it/resources/CRUNCH-316.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/CRUNCH-316.avsc b/crunch-core/src/it/resources/CRUNCH-316.avsc
new file mode 100644
index 0000000..178fa94
--- /dev/null
+++ b/crunch-core/src/it/resources/CRUNCH-316.avsc
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+{
+ "name": "rec",
+ "namespace": "crunch",
+ "type": "record",
+ "fields": [
+ {
+ "name": "tag",
+ "type": "string"
+ },
+ {
+ "name": "data1",
+ "type": "string"
+ },
+ {
+ "name": "data2",
+ "type": [
+ "string",
+ "null"
+ ]
+ }
+ ]
+}