You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/31 01:56:27 UTC

crunch git commit: CRUNCH-495: Fix case class/SpecificRecord interactions in Scrunch

Repository: crunch
Updated Branches:
  refs/heads/master 4f2b1f25f -> 7157c0a1f


CRUNCH-495: Fix case class/SpecificRecord interactions in Scrunch


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7157c0a1
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7157c0a1
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7157c0a1

Branch: refs/heads/master
Commit: 7157c0a1f481ffa6e817065abce3159ef24ec067
Parents: 4f2b1f2
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jan 29 16:39:05 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jan 29 16:39:05 2015 -0800

----------------------------------------------------------------------
 crunch-scrunch/pom.xml                          | 17 +++++++++++++
 crunch-scrunch/src/it/avro/person.avsc          | 26 ++++++++++++++++++++
 .../org/apache/crunch/scrunch/JoinTest.scala    | 19 ++++++++++++--
 .../org/apache/crunch/scrunch/PTypeFamily.scala |  2 +-
 4 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/7157c0a1/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index 7f87c62..b361f77 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -191,6 +191,23 @@ under the License.
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>schemas</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <testSourceDirectory>${project.basedir}/src/it/avro/</testSourceDirectory>
+              <testOutputDirectory>target/generated-test-sources/</testOutputDirectory>
+            </configuration>
+         </execution>
+       </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/7157c0a1/crunch-scrunch/src/it/avro/person.avsc
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/avro/person.avsc b/crunch-scrunch/src/it/avro/person.avsc
new file mode 100644
index 0000000..eb24071
--- /dev/null
+++ b/crunch-scrunch/src/it/avro/person.avsc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+"namespace": "org.apache.crunch.test",
+"name": "Person",
+"type": "record",
+"fields": [
+  {"name": "name", "type": ["string", "null"] },
+  {"name": "age", "type": "int"},
+  {"name": "siblingnames", "type" : ["null", { "type": "array", "items": "string" }], "default": null } ]
+} 

http://git-wip-us.apache.org/repos/asf/crunch/blob/7157c0a1/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
index eb3b677..35a6500 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
@@ -17,10 +17,12 @@
  */
 package org.apache.crunch.scrunch
 
+import com.google.common.collect.ImmutableList
 import org.apache.crunch.io.{From => from, To => to}
-
+import org.apache.crunch.test.Person
 import _root_.org.junit.Test
-import org.apache.crunch.lib.join.JoinType
+
+case class Foo(val bar: Int, val baz: String)
 
 class JoinTest extends CrunchSuite {
   lazy val pipeline = Pipeline.mapReduce[JoinTest](tempDir.getDefaultConfiguration)
@@ -53,4 +55,17 @@ class JoinTest extends CrunchSuite {
     assert(filtered.exists(_ == ("macbeth", 66)))
     pipeline.done
   }
+
+  @Test def joinCaseWithSpecific: Unit = {
+    val foos = pipeline.create(List(Foo(1, "a"), Foo(2, "b")), Avros.caseClasses[Foo])
+    val p1 = new Person()
+    p1.setName("Josh")
+    p1.setAge(35)
+    p1.setSiblingnames(ImmutableList.of("Kate", "Mike"))
+    val people = pipeline.create(List(p1), Avros.specifics[Person])
+
+    val res = foos.by(_ => "key").join(people.by(_ => "key"))
+    println(res.materialize())
+    pipeline.done()
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7157c0a1/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 433b471..19c9421 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -241,7 +241,7 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
     val args = ctor.paramss.head.map(x => (x.name.toString, typeToPType(x.typeSignature)))
     val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]] : _*)
     val rtc = currentMirror.runtimeClass(tpe)
-    val base = namedTuples(rtc.getCanonicalName, args)
+    val base = namedTuples(rtc.getCanonicalName + "_", args) // See CRUNCH-495
     ptf.derivedImmutable(rtc.asInstanceOf[Class[T]], new TypeMapFn[T](rtc), new TMapFn[T, TupleN](out), base)
   }