You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/19 08:54:01 UTC
[2/2] git commit: CRUNCH-93: Moving test support classes
CRUNCH-93: Moving test support classes
- Moved InMemoryEmitter to o.a.c.mem.emit package
- Created a TestContext in CrunchTestSupport class for unit testing of
various components
- Deleted unit test code from DoFn e.g. configurationForTest
- All things are retrived from context
- Implement InMemorycontext that has only config elements for MemoryPipeline
Signed-off-by: Rahul Sharma <rs...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1cee024c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1cee024c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1cee024c
Branch: refs/heads/master
Commit: 1cee024c65938606d0d5794fbc4d10a1135f0f65
Parents: 14132b0
Author: Rahul Sharma <rs...@apache.org>
Authored: Fri Oct 12 21:40:52 2012 +0530
Committer: Rahul Sharma <rs...@apache.org>
Committed: Fri Oct 19 11:01:52 2012 +0530
----------------------------------------------------------------------
crunch-test/pom.xml | 5 +
.../org/apache/crunch/test/CrunchTestSupport.java | 70 ++++++++++++++-
.../java/org/apache/crunch/test/TestCounters.java | 41 +++++++++
.../apache/crunch/test/CrunchTestSupportTest.java | 53 +++++++++++
crunch/pom.xml | 7 ++
crunch/src/main/java/org/apache/crunch/DoFn.java | 50 ++---------
.../java/org/apache/crunch/fn/CompositeMapFn.java | 6 --
.../main/java/org/apache/crunch/fn/PairMapFn.java | 5 -
.../crunch/impl/mem/collect/MemCollection.java | 59 ++++++++++++-
.../crunch/impl/mem/emit/InMemoryEmitter.java | 57 ++++++++++++
.../crunch/io/avro/AvroFileReaderFactory.java | 1 -
.../apache/crunch/io/seq/SeqFileReaderFactory.java | 1 -
.../crunch/io/seq/SeqFileTableReaderFactory.java | 2 -
.../crunch/io/text/TextFileReaderFactory.java | 1 -
.../org/apache/crunch/test/InMemoryEmitter.java | 57 ------------
.../java/org/apache/crunch/test/TestCounters.java | 41 ---------
.../apache/crunch/types/avro/AvroTableType.java | 12 ---
.../java/org/apache/crunch/types/avro/Avros.java | 34 -------
.../apache/crunch/types/writable/Writables.java | 31 -------
.../org/apache/crunch/lib/join/JoinFnTestBase.java | 9 +-
.../java/org/apache/crunch/test/CountersTest.java | 11 +--
.../org/apache/crunch/types/avro/AvrosTest.java | 28 ++++--
22 files changed, 321 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-test/pom.xml b/crunch-test/pom.xml
index 6caf8ed..6378f31 100644
--- a/crunch-test/pom.xml
+++ b/crunch-test/pom.xml
@@ -59,6 +59,11 @@ under the License.
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java b/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
index 7f74931..fc1a77a 100644
--- a/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
+++ b/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
@@ -17,14 +17,76 @@
*/
package org.apache.crunch.test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.junit.Rule;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
- * A temporary workaround for Scala tests to use when working with Rule annotations
- * until it gets fixed in JUnit 4.11.
+ * A temporary workaround for Scala tests to use when working with Rule
+ * annotations until it gets fixed in JUnit 4.11.
*/
public class CrunchTestSupport {
@Rule
- public TemporaryPath tempDir = new TemporaryPath(
- "crunch.tmp.dir", "hadoop.tmp.dir");
+ public TemporaryPath tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir");
+
+ /**
+ * The method creates a {@linkplain TaskInputOutputContext} which can be used
+ * in unit tests. The context servers very limited purpose. You can only
+ * operate with counters, taskAttempId ,status and configuration while using
+ * this context.
+ */
+ public static <KI, VI, KO, VO> TaskInputOutputContext<KI, VI, KO, VO> getTestContext(final Configuration config) {
+ TaskInputOutputContext<KI, VI, KO, VO> context = Mockito.mock(TaskInputOutputContext.class);
+ TestCounters.clearCounters();
+ final StateHolder holder = new StateHolder();
+
+ Mockito.when(context.getCounter(Mockito.any(Enum.class))).then(new Answer<Counter>() {
+ @Override
+ public Counter answer(InvocationOnMock invocation) throws Throwable {
+ Enum<?> counter = (Enum<?>) invocation.getArguments()[0];
+ return TestCounters.getCounter(counter);
+ }
+
+ });
+
+ Mockito.when(context.getCounter(Mockito.anyString(), Mockito.anyString())).then(new Answer<Counter>() {
+ @Override
+ public Counter answer(InvocationOnMock invocation) throws Throwable {
+ String group = (String) invocation.getArguments()[0];
+ String name = (String) invocation.getArguments()[0];
+ return TestCounters.getCounter(group, name);
+ }
+
+ });
+
+ Mockito.when(context.getConfiguration()).thenReturn(config);
+ Mockito.when(context.getTaskAttemptID()).thenReturn(new TaskAttemptID());
+
+ Mockito.when(context.getStatus()).then(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation) throws Throwable {
+ return holder.internalStatus;
+ }
+ });
+
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ holder.internalStatus = (String) invocation.getArguments()[0];
+ return null;
+ }
+ }).when(context).setStatus(Mockito.anyString());
+
+ return context;
+
+ }
+
+ static class StateHolder {
+ private String internalStatus;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java
new file mode 100644
index 0000000..bcb4da1
--- /dev/null
+++ b/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+
+/**
+ * A utility class used during unit testing to update and read counters.
+ */
+public class TestCounters {
+
+ private static Counters COUNTERS = new Counters();
+
+ public static Counter getCounter(Enum<?> e) {
+ return COUNTERS.findCounter(e);
+ }
+
+ public static Counter getCounter(String group, String name) {
+ return COUNTERS.findCounter(group, name);
+ }
+
+ public static void clearCounters() {
+ COUNTERS = new Counters();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java b/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java
new file mode 100644
index 0000000..65b86d9
--- /dev/null
+++ b/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Test;
+
+public class CrunchTestSupportTest {
+
+ enum CT {
+ ONE,
+ };
+
+ @Test
+ public void testContext() {
+ Configuration sampleConfig = new Configuration();
+ String status = "test";
+ TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(sampleConfig);
+ assertEquals(sampleConfig, testContext.getConfiguration());
+ TaskAttemptID taskAttemptID = testContext.getTaskAttemptID();
+ assertEquals(taskAttemptID, testContext.getTaskAttemptID());
+ assertNotNull(taskAttemptID);
+ assertNull(testContext.getStatus());
+ testContext.setStatus(status);
+ assertEquals(status, testContext.getStatus());
+ assertEquals(0, testContext.getCounter(CT.ONE).getValue());
+ testContext.getCounter(CT.ONE).increment(1);
+ assertEquals(1, testContext.getCounter(CT.ONE).getValue());
+ testContext.getCounter(CT.ONE).increment(4);
+ assertEquals(5, testContext.getCounter(CT.ONE).getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch/pom.xml b/crunch/pom.xml
index 5dcc489..39dcc75 100644
--- a/crunch/pom.xml
+++ b/crunch/pom.xml
@@ -75,6 +75,13 @@ under the License.
<artifactId>jackson-mapper-asl</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>javassist</groupId>
+ <artifactId>javassist</artifactId>
+ <version>3.12.1.GA</version>
+ </dependency>
+
<!-- Both Protobufs and Thrift are supported as
derived serialization types, and you can use
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java
index 7d516de..8d7cc17 100644
--- a/crunch/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch/src/main/java/org/apache/crunch/DoFn.java
@@ -19,7 +19,6 @@ package org.apache.crunch;
import java.io.Serializable;
-import org.apache.crunch.test.TestCounters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -38,8 +37,6 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
*/
public abstract class DoFn<S, T> implements Serializable {
private transient TaskInputOutputContext<?, ?, ?, ?> context;
- private transient Configuration testConf;
- private transient String internalStatus;
/**
* Configure this DoFn. Subclasses may override this method to modify the
@@ -61,8 +58,8 @@ public abstract class DoFn<S, T> implements Serializable {
* this method to do appropriate initialization.
*
* <p>
- * Called during the setup of the job instance this {@code DoFn} is
- * associated with.
+ * Called during the setup of the job instance this {@code DoFn} is associated
+ * with.
* </p>
*
*/
@@ -110,16 +107,6 @@ public abstract class DoFn<S, T> implements Serializable {
}
/**
- * Sets a {@code Configuration} instance to be used during unit tests.
- *
- * @param conf
- * The Configuration instance.
- */
- public void setConfigurationForTest(Configuration conf) {
- this.testConf = conf;
- }
-
- /**
* Returns an estimate of how applying this function to a {@link PCollection}
* will cause it to change in side. The optimizer uses these estimates to
* decide where to break up dependent MR jobs into separate Map and Reduce
@@ -138,25 +125,14 @@ public abstract class DoFn<S, T> implements Serializable {
}
protected Configuration getConfiguration() {
- if (context != null) {
- return context.getConfiguration();
- } else if (testConf != null) {
- return testConf;
- }
- return null;
+ return context.getConfiguration();
}
protected Counter getCounter(Enum<?> counterName) {
- if (context == null) {
- return TestCounters.getCounter(counterName);
- }
return context.getCounter(counterName);
}
protected Counter getCounter(String groupName, String counterName) {
- if (context == null) {
- return TestCounters.getCounter(groupName, counterName);
- }
return context.getCounter(groupName, counterName);
}
@@ -169,31 +145,19 @@ public abstract class DoFn<S, T> implements Serializable {
}
protected void progress() {
- if (context != null) {
- context.progress();
- }
+ context.progress();
}
protected TaskAttemptID getTaskAttemptID() {
- if (context != null) {
- return context.getTaskAttemptID();
- } else {
- return new TaskAttemptID();
- }
+ return context.getTaskAttemptID();
}
protected void setStatus(String status) {
- if (context != null) {
- context.setStatus(status);
- }
- this.internalStatus = status;
+ context.setStatus(status);
}
protected String getStatus() {
- if (context != null) {
- return context.getStatus();
- }
- return internalStatus;
+ return context.getStatus();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
index a41daf4..4714fe4 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -61,10 +61,4 @@ public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
first.configure(conf);
second.configure(conf);
}
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- first.setConfigurationForTest(conf);
- second.setConfigurationForTest(conf);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
index 63634ef..b25a6d8 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -55,9 +55,4 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
values.cleanup(null);
}
- @Override
- public void setConfigurationForTest(Configuration conf) {
- keys.setConfigurationForTest(conf);
- values.setConfigurationForTest(conf);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index a79ec2b..61bb1e7 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -17,8 +17,13 @@
*/
package org.apache.crunch.impl.mem.collect;
+import java.lang.reflect.Method;
import java.util.Collection;
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+
import org.apache.crunch.DoFn;
import org.apache.crunch.FilterFn;
import org.apache.crunch.MapFn;
@@ -30,14 +35,20 @@ import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
import org.apache.crunch.fn.ExtractKeyFn;
import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.lib.Sample;
import org.apache.crunch.lib.Sort;
import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.test.InMemoryEmitter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -103,6 +114,7 @@ public class MemCollection<S> implements PCollection<S> {
@Override
public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
+ doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
doFn.initialize();
for (S s : collect) {
doFn.process(s, emitter);
@@ -214,4 +226,49 @@ public class MemCollection<S> implements PCollection<S> {
public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
}
+
+ /**
+ * The method creates a {@link TaskInputOutputContext} that will just provide
+ * {@linkplain Configuration}. The method has been implemented with javaassist
+ * as there are API changes in versions of Hadoop. In hadoop 1.0.3 the
+ * {@linkplain TaskInputOutputContext} is abstract class while in version 2
+ * the same is an interface.
+ * <p>
+ * Note: The intention of this is to provide the bare essentials that are
+ * required to make the {@linkplain MemPipeline} work. It lacks even the basic
+ * things that can proved some support for unit testing pipeline.
+ */
+ private static TaskInputOutputContext<?, ?, ?, ?> getInMemoryContext(final Configuration conf) {
+ ProxyFactory factory = new ProxyFactory();
+ Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
+ Class[] types = new Class[0];
+ Object[] args = new Object[0];
+ if (superType.isInterface()) {
+ factory.setInterfaces(new Class[] { superType });
+ } else {
+ types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
+ StatusReporter.class };
+ args = new Object[] { conf, new TaskAttemptID(), null, null, null };
+ factory.setSuperclass(superType);
+ }
+ factory.setFilter(new MethodFilter() {
+ @Override
+ public boolean isHandled(Method m) {
+ return m.getName().equals("getConfiguration");
+ }
+ });
+ MethodHandler handler = new MethodHandler() {
+ @Override
+ public Object invoke(Object arg0, Method arg1, Method arg2, Object[] arg3) throws Throwable {
+ return conf;
+ }
+ };
+ try {
+ Object newInstance = factory.create(types, args, handler);
+ return (TaskInputOutputContext<?, ?, ?, ?>) newInstance;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
new file mode 100644
index 0000000..6976615
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.emit;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * An {@code Emitter} instance that writes emitted records to a backing
+ * {@code List}.
+ *
+ * @param <T>
+ */
+public class InMemoryEmitter<T> implements Emitter<T> {
+
+ private final List<T> output;
+
+ public InMemoryEmitter() {
+ this(Lists.<T> newArrayList());
+ }
+
+ public InMemoryEmitter(List<T> output) {
+ this.output = output;
+ }
+
+ @Override
+ public void emit(T emitted) {
+ output.add(emitted);
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ public List<T> getOutput() {
+ return output;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index d1940cc..6f21dd2 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -69,7 +69,6 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
@Override
public Iterator<T> read(FileSystem fs, final Path path) {
- this.mapFn.setConfigurationForTest(conf);
this.mapFn.initialize();
try {
FsInput fsi = new FsInput(path, fs.getConf());
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 050c0fc..ad1b81b 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -54,7 +54,6 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
@Override
public Iterator<T> read(FileSystem fs, final Path path) {
- mapFn.setConfigurationForTest(conf);
mapFn.initialize();
try {
final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
index 7c34a75..20c749a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
@@ -59,9 +59,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
@Override
public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) {
- keyMapFn.setConfigurationForTest(conf);
keyMapFn.initialize();
- valueMapFn.setConfigurationForTest(conf);
valueMapFn.initialize();
try {
final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index 5a512fc..a0c48e0 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -63,7 +63,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
mapFn = ((CompositeMapFn) input).getSecond();
}
}
- mapFn.setConfigurationForTest(conf);
mapFn.initialize();
FSDataInputStream is;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
deleted file mode 100644
index 1e0acb9..0000000
--- a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-
-import com.google.common.collect.Lists;
-
-/**
- * An {@code Emitter} instance that writes emitted records to a backing
- * {@code List}.
- *
- * @param <T>
- */
-public class InMemoryEmitter<T> implements Emitter<T> {
-
- private final List<T> output;
-
- public InMemoryEmitter() {
- this(Lists.<T> newArrayList());
- }
-
- public InMemoryEmitter(List<T> output) {
- this.output = output;
- }
-
- @Override
- public void emit(T emitted) {
- output.add(emitted);
- }
-
- @Override
- public void flush() {
-
- }
-
- public List<T> getOutput() {
- return output;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
deleted file mode 100644
index bcb4da1..0000000
--- a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-/**
- * A utility class used during unit testing to update and read counters.
- */
-public class TestCounters {
-
- private static Counters COUNTERS = new Counters();
-
- public static Counter getCounter(Enum<?> e) {
- return COUNTERS.findCounter(e);
- }
-
- public static Counter getCounter(String group, String name) {
- return COUNTERS.findCounter(group, name);
- }
-
- public static void clearCounters() {
- COUNTERS = new Counters();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 285b423..5416c4f 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -57,12 +57,6 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- keyMapFn.setConfigurationForTest(conf);
- valueMapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
keyMapFn.setContext(getContext());
valueMapFn.setContext(getContext());
@@ -99,12 +93,6 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- firstMapFn.setConfigurationForTest(conf);
- secondMapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
firstMapFn.setContext(getContext());
secondMapFn.setContext(getContext());
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 969af1d..4a83db5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -291,11 +291,6 @@ public class Avros {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
this.mapFn.setContext(getContext());
}
@@ -335,11 +330,6 @@ public class Avros {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
this.mapFn.setContext(getContext());
}
@@ -378,11 +368,6 @@ public class Avros {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
this.mapFn.setContext(getContext());
}
@@ -410,11 +395,6 @@ public class Avros {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
this.mapFn.setContext(getContext());
}
@@ -460,13 +440,6 @@ public class Avros {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- for (MapFn fn : fns) {
- fn.setConfigurationForTest(conf);
- }
- }
-
- @Override
public void initialize() {
for (MapFn fn : fns) {
fn.setContext(getContext());
@@ -527,13 +500,6 @@ public class Avros {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- for (MapFn fn : fns) {
- fn.setConfigurationForTest(conf);
- }
- }
-
- @Override
public void initialize() {
this.schema = new Schema.Parser().parse(jsonSchema);
for (MapFn fn : fns) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index 23bc7f5..5e305b8 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -297,13 +297,6 @@ public class Writables {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- for (MapFn fn : fns) {
- fn.setConfigurationForTest(conf);
- }
- }
-
- @Override
public void initialize() {
for (MapFn fn : fns) {
fn.setContext(getContext());
@@ -353,12 +346,6 @@ public class Writables {
}
}
- @Override
- public void setConfigurationForTest(Configuration conf) {
- for (MapFn fn : fns) {
- fn.setConfigurationForTest(conf);
- }
- }
@Override
public void initialize() {
@@ -439,10 +426,6 @@ public class Writables {
mapFn.configure(conf);
}
- @Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
@Override
public void initialize() {
@@ -474,10 +457,6 @@ public class Writables {
mapFn.configure(conf);
}
- @Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
@Override
public void initialize() {
@@ -516,11 +495,6 @@ public class Writables {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
mapFn.setContext(getContext());
}
@@ -551,11 +525,6 @@ public class Writables {
}
@Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
public void initialize() {
mapFn.setContext(getContext());
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
index 741899e..9e4337f 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.crunch.Emitter;
import org.apache.crunch.Pair;
+import org.apache.crunch.test.CrunchTestSupport;
import org.apache.crunch.test.StringWrapper;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
@@ -41,7 +42,7 @@ public abstract class JoinFnTestBase {
@Before
public void setUp() {
joinFn = getJoinFn();
- joinFn.setConfigurationForTest(new Configuration());
+ joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
joinFn.initialize();
emitter = mock(Emitter.class);
}
@@ -67,13 +68,11 @@ public abstract class JoinFnTestBase {
}
- protected abstract void checkOutput(
- Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter);
+ protected abstract void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter);
protected abstract JoinFn<StringWrapper, StringWrapper, String> getJoinFn();
- protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue,
- String rightValue) {
+ protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, String rightValue) {
Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue);
List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList();
valuePairList.add(valuePair);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
index 3df7657..66f854e 100644
--- a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
+++ b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
-import org.junit.After;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
/**
@@ -35,12 +35,11 @@ public class CountersTest {
THREE
};
- @After
- public void after() {
- TestCounters.clearCounters();
- }
-
public static class CTFn extends DoFn<String, String> {
+ CTFn() {
+ setContext(CrunchTestSupport.getTestContext(new Configuration()));
+ }
+
@Override
public void process(String input, Emitter<String> emitter) {
getCounter(CT.ONE).increment(1);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index 801829d..dabf0fe 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -37,13 +37,16 @@ import org.apache.crunch.Pair;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.test.CrunchTestSupport;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.types.DeepCopier;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
@@ -186,6 +189,11 @@ public class AvrosTest {
@SuppressWarnings("rawtypes")
public void testWritables() throws Exception {
AvroType at = Avros.writables(LongWritable.class);
+
+ TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(new Configuration());
+ at.getInputMapFn().setContext(testContext);
+ at.getOutputMapFn().setContext(testContext);
+
LongWritable lw = new LongWritable(1729L);
assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw)));
}
@@ -222,9 +230,9 @@ public class AvrosTest {
public void testIsPrimitive_PrimitiveMappedType() {
assertTrue(Avros.isPrimitive(Avros.ints()));
}
-
+
@Test
- public void testIsPrimitive_TruePrimitiveValue(){
+ public void testIsPrimitive_TruePrimitiveValue() {
AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier());
assertTrue(Avros.isPrimitive(truePrimitiveAvroType));
}
@@ -294,20 +302,20 @@ public class AvrosTest {
assertEquals(pair, doubleMappedPair);
}
-
+
@Test
- public void testPairOutputMapFn_VerifyNoObjectReuse(){
+ public void testPairOutputMapFn_VerifyNoObjectReuse() {
StringWrapper stringWrapper = new StringWrapper("Test");
-
- Pair<Integer,StringWrapper> pair = Pair.of(1, stringWrapper);
-
+
+ Pair<Integer, StringWrapper> pair = Pair.of(1, stringWrapper);
+
AvroType<Pair<Integer, StringWrapper>> pairType = Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class));
-
+
pairType.getOutputMapFn().initialize();
-
+
Object outputMappedValueA = pairType.getOutputMapFn().map(pair);
Object outputMappedValueB = pairType.getOutputMapFn().map(pair);
-
+
assertEquals(outputMappedValueA, outputMappedValueB);
assertNotSame(outputMappedValueA, outputMappedValueB);
}