You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by tz...@apache.org on 2013/05/10 08:22:46 UTC
[2/3] git commit: Integration tests for CRUNCH-204)
Integration tests for CRUNCH-204)
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d7570483
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d7570483
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d7570483
Branch: refs/heads/master
Commit: d75704836d466bcaed0355782ad2bc559ffa2519
Parents: 2276ee0
Author: tzolov <tz...@apache.org>
Authored: Thu May 9 20:37:28 2013 +0200
Committer: tzolov <tz...@apache.org>
Committed: Thu May 9 20:37:28 2013 +0200
----------------------------------------------------------------------
.../apache/crunch/io/avro/AvroMemPipelineIT.java | 118 +++++++++++++++
1 files changed, 118 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/d7570483/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
new file mode 100644
index 0000000..b997a52
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroMemPipelineIT implements Serializable {
+
+ private transient File avroFile;
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Before
+ public void setUp() throws IOException {
+ avroFile = tmpDir.getFile("test.avro");
+ }
+
+ @Test
+ public void testMemPipelienWithSpecificRecord() {
+
+ List<CharSequence> siblingnames = Lists.newArrayList();
+ Person writeRecord = new Person("John", 41, siblingnames);
+
+ final PCollection<Person> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+ writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
+
+ PCollection<Person> readCollection = MemPipeline.getInstance().read(
+ At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class)));
+
+ Person readRecord = readCollection.materialize().iterator().next();
+
+ assertEquals(writeRecord, readRecord);
+ }
+
+ @Test
+ public void testMemPipelienWithGenericRecord() {
+
+ GenericRecord writeRecord = createGenericRecord();
+
+ final PCollection<GenericRecord> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+ persons.write(To.avroFile(avroFile.getAbsolutePath()));
+
+ PCollection<Record> readCollection = MemPipeline.getInstance().read(
+ At.avroFile(avroFile.getAbsolutePath(), Avros.generics(writeRecord.getSchema())));
+
+ Record readRecord = readCollection.materialize().iterator().next();
+
+ assertEquals(writeRecord, readRecord);
+ }
+
+ private GenericRecord createGenericRecord() {
+
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+
+ return savedRecord;
+ }
+
+ @Test
+ public void testMemPipelienWithReflectionRecord() {
+
+ String writeRecord = "John Doe";
+
+ final PCollection<String> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+ persons.write(To.avroFile(avroFile.getAbsolutePath()));
+
+ PCollection<? extends String> readCollection = MemPipeline.getInstance().read(
+ At.avroFile(avroFile.getAbsolutePath(), Avros.reflects(writeRecord.getClass())));
+
+ Object readRecord = readCollection.materialize().iterator().next();
+
+ assertEquals(writeRecord, readRecord.toString());
+ }
+
+}