You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:16 UTC
[14/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
deleted file mode 100644
index 84c720c..0000000
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Manages the state of a pipeline execution.
- *
- */
-public interface Pipeline {
-
- /**
- * Set the {@code Configuration} to use with this pipeline.
- */
- void setConfiguration(Configuration conf);
-
- /**
- * Returns the name of this pipeline.
- *
- * @return Name of the pipeline
- */
- String getName();
-
- /**
- * Returns the {@code Configuration} instance associated with this pipeline.
- */
- Configuration getConfiguration();
-
- /**
- * Converts the given {@code Source} into a {@code PCollection} that is
- * available to jobs run using this {@code Pipeline} instance.
- *
- * @param source
- * The source of data
- * @return A PCollection that references the given source
- */
- <T> PCollection<T> read(Source<T> source);
-
- /**
- * A version of the read method for {@code TableSource} instances that map to
- * {@code PTable}s.
- *
- * @param tableSource
- * The source of the data
- * @return A PTable that references the given source
- */
- <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
-
- /**
- * Write the given collection to the given target on the next pipeline run. The
- * system will check to see if the target's location already exists using the
- * {@code WriteMode.DEFAULT} rule for the given {@code Target}.
- *
- * @param collection
- * The collection
- * @param target
- * The output target
- */
- void write(PCollection<?> collection, Target target);
-
- /**
- * Write the contents of the {@code PCollection} to the given {@code Target},
- * using the storage format specified by the target and the given
- * {@code WriteMode} for cases where the referenced {@code Target}
- * already exists.
- *
- * @param collection
- * The collection
- * @param target
- * The target to write to
- * @param writeMode
- * The strategy to use for handling existing outputs
- */
- void write(PCollection<?> collection, Target target,
- Target.WriteMode writeMode);
-
- /**
- * Create the given PCollection and read the data it contains into the
- * returned Collection instance for client use.
- *
- * @param pcollection
- * The PCollection to materialize
- * @return the data from the PCollection as a read-only Collection
- */
- <T> Iterable<T> materialize(PCollection<T> pcollection);
-
- /**
- * Constructs and executes a series of MapReduce jobs in order to write data
- * to the output targets.
- */
- PipelineResult run();
-
- /**
- * Constructs and starts a series of MapReduce jobs in order ot write data to
- * the output targets, but returns a {@code ListenableFuture} to allow clients to control
- * job execution.
- * @return
- */
- PipelineExecution runAsync();
-
- /**
- * Run any remaining jobs required to generate outputs and then clean up any
- * intermediate data files that were created in this run or previous calls to
- * {@code run}.
- */
- PipelineResult done();
-
- /**
- * A convenience method for reading a text file.
- */
- PCollection<String> readTextFile(String pathName);
-
- /**
- * A convenience method for writing a text file.
- */
- <T> void writeTextFile(PCollection<T> collection, String pathName);
-
- /**
- * Turn on debug logging for jobs that are run from this pipeline.
- */
- void enableDebug();
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
deleted file mode 100644
index fc6bb91..0000000
--- a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A handle to allow clients to control a Crunch pipeline as it runs.
- *
- * This interface is thread-safe.
- */
-public interface PipelineExecution {
-
- enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
-
- /** Returns the .dot file that allows a client to graph the Crunch execution plan for this
- * pipeline.
- */
- String getPlanDotFile();
-
- /** Blocks until pipeline completes or the specified waiting time elapsed. */
- void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
-
- /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */
- void waitUntilDone() throws InterruptedException;
-
- Status getStatus();
-
- /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */
- PipelineResult getResult();
-
- /**
- * Kills the pipeline if it is running, no-op otherwise.
- *
- * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return.
- * To wait for completely exits, use {@link #waitUntilDone()} after this call.
- */
- void kill() throws InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineResult.java b/crunch/src/main/java/org/apache/crunch/PipelineResult.java
deleted file mode 100644
index 90b1067..0000000
--- a/crunch/src/main/java/org/apache/crunch/PipelineResult.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Container for the results of a call to {@code run} or {@code done} on the
- * Pipeline interface that includes details and statistics about the component
- * stages of the data pipeline.
- */
-public class PipelineResult {
-
- public static class StageResult {
-
- private final String stageName;
- private final Counters counters;
-
- public StageResult(String stageName, Counters counters) {
- this.stageName = stageName;
- this.counters = counters;
- }
-
- public String getStageName() {
- return stageName;
- }
-
- public Counters getCounters() {
- return counters;
- }
-
- public Counter findCounter(Enum<?> key) {
- return counters.findCounter(key);
- }
-
- public long getCounterValue(Enum<?> key) {
- return findCounter(key).getValue();
- }
- }
-
- public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of());
-
- private final List<StageResult> stageResults;
-
- public PipelineResult(List<StageResult> stageResults) {
- this.stageResults = ImmutableList.copyOf(stageResults);
- }
-
- public boolean succeeded() {
- return !stageResults.isEmpty();
- }
-
- public List<StageResult> getStageResults() {
- return stageResults;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Source.java b/crunch/src/main/java/org/apache/crunch/Source.java
deleted file mode 100644
index f54d135..0000000
--- a/crunch/src/main/java/org/apache/crunch/Source.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.IOException;
-
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * A {@code Source} represents an input data set that is an input to one or more
- * MapReduce jobs.
- *
- */
-public interface Source<T> {
- /**
- * Returns the {@code PType} for this source.
- */
- PType<T> getType();
-
- /**
- * Configure the given job to use this source as an input.
- *
- * @param job
- * The job to configure
- * @param inputId
- * For a multi-input job, an identifier for this input to the job
- * @throws IOException
- */
- void configureSource(Job job, int inputId) throws IOException;
-
- /**
- * Returns the number of bytes in this {@code Source}.
- */
- long getSize(Configuration configuration);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/SourceTarget.java b/crunch/src/main/java/org/apache/crunch/SourceTarget.java
deleted file mode 100644
index 09c03c6..0000000
--- a/crunch/src/main/java/org/apache/crunch/SourceTarget.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * An interface for classes that implement both the {@code Source} and the
- * {@code Target} interfaces.
- *
- */
-public interface SourceTarget<T> extends Source<T>, Target {
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TableSource.java b/crunch/src/main/java/org/apache/crunch/TableSource.java
deleted file mode 100644
index ff27346..0000000
--- a/crunch/src/main/java/org/apache/crunch/TableSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.types.PTableType;
-
-/**
- * The interface {@code Source} implementations that return a {@link PTable}.
- *
- */
-public interface TableSource<K, V> extends Source<Pair<K, V>> {
- PTableType<K, V> getTableType();
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java
deleted file mode 100644
index 9b1ed34..0000000
--- a/crunch/src/main/java/org/apache/crunch/TableSourceTarget.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * An interface for classes that implement both the {@code TableSource} and the
- * {@code Target} interfaces.
- */
-public interface TableSourceTarget<K, V> extends TableSource<K, V>, SourceTarget<Pair<K, V>> {
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java
deleted file mode 100644
index 0a0c23d..0000000
--- a/crunch/src/main/java/org/apache/crunch/Target.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A {@code Target} represents the output destination of a Crunch {@code PCollection}
- * in the context of a Crunch job.
- */
-public interface Target {
-
- /**
- * An enum to represent different options the client may specify
- * for handling the case where the output path, table, etc. referenced
- * by a {@code Target} already exists.
- */
- enum WriteMode {
- /**
- * Check to see if the output target already exists before running
- * the pipeline, and if it does, print an error and throw an exception.
- */
- DEFAULT,
-
- /**
- * Check to see if the output target already exists, and if it does,
- * delete it and overwrite it with the new output (if any).
- */
- OVERWRITE,
-
- /**
- * If the output target does not exist, create it. If it does exist,
- * add the output of this pipeline to the target. This was the
- * behavior in Crunch up to version 0.4.0.
- */
- APPEND
- }
-
- /**
- * Apply the given {@code WriteMode} to this {@code Target} instance.
- *
- * @param writeMode The strategy for handling existing outputs
- * @param conf The ever-useful {@code Configuration} instance
- */
- void handleExisting(WriteMode writeMode, Configuration conf);
-
- /**
- * Checks to see if this {@code Target} instance is compatible with the
- * given {@code PType}.
- *
- * @param handler The {@link OutputHandler} that is managing the output for the job
- * @param ptype The {@code PType} to check
- * @return True if this Target can write data in the form of the given {@code PType},
- * false otherwise
- */
- boolean accept(OutputHandler handler, PType<?> ptype);
-
- /**
- * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
- * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
- *
- * @param ptype The {@code PType} to use in constructing the {@code SourceTarget}
- * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist
- */
- <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple.java b/crunch/src/main/java/org/apache/crunch/Tuple.java
deleted file mode 100644
index 4e602ff..0000000
--- a/crunch/src/main/java/org/apache/crunch/Tuple.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A fixed-size collection of Objects, used in Crunch for representing joins
- * between {@code PCollection}s.
- *
- */
-public interface Tuple {
-
- /**
- * Returns the Object at the given index.
- */
- Object get(int index);
-
- /**
- * Returns the number of elements in this Tuple.
- */
- int size();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple3.java b/crunch/src/main/java/org/apache/crunch/Tuple3.java
deleted file mode 100644
index 4372811..0000000
--- a/crunch/src/main/java/org/apache/crunch/Tuple3.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for three-element {@link Tuple}s.
- */
-public class Tuple3<V1, V2, V3> implements Tuple {
-
- private final V1 first;
- private final V2 second;
- private final V3 third;
-
- public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
- return new Tuple3<A, B, C>(a, b, c);
- }
-
- public Tuple3(V1 first, V2 second, V3 third) {
- this.first = first;
- this.second = second;
- this.third = third;
- }
-
- public V1 first() {
- return first;
- }
-
- public V2 second() {
- return second;
- }
-
- public V3 third() {
- return third;
- }
-
- public Object get(int index) {
- switch (index) {
- case 0:
- return first;
- case 1:
- return second;
- case 2:
- return third;
- default:
- throw new ArrayIndexOutOfBoundsException();
- }
- }
-
- public int size() {
- return 3;
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- return hcb.append(first).append(second).append(third).toHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
- return (first == other.first || (first != null && first.equals(other.first)))
- && (second == other.second || (second != null && second.equals(other.second)))
- && (third == other.third || (third != null && third.equals(other.third)));
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("Tuple3[");
- sb.append(first).append(",").append(second).append(",").append(third);
- return sb.append("]").toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple4.java b/crunch/src/main/java/org/apache/crunch/Tuple4.java
deleted file mode 100644
index f161371..0000000
--- a/crunch/src/main/java/org/apache/crunch/Tuple4.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for four-element {@link Tuple}s.
- */
-public class Tuple4<V1, V2, V3, V4> implements Tuple {
-
- private final V1 first;
- private final V2 second;
- private final V3 third;
- private final V4 fourth;
-
- public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
- return new Tuple4<A, B, C, D>(a, b, c, d);
- }
-
- public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
- this.first = first;
- this.second = second;
- this.third = third;
- this.fourth = fourth;
- }
-
- public V1 first() {
- return first;
- }
-
- public V2 second() {
- return second;
- }
-
- public V3 third() {
- return third;
- }
-
- public V4 fourth() {
- return fourth;
- }
-
- public Object get(int index) {
- switch (index) {
- case 0:
- return first;
- case 1:
- return second;
- case 2:
- return third;
- case 3:
- return fourth;
- default:
- throw new ArrayIndexOutOfBoundsException();
- }
- }
-
- public int size() {
- return 4;
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- return hcb.append(first).append(second).append(third).append(fourth).toHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
- return (first == other.first || (first != null && first.equals(other.first)))
- && (second == other.second || (second != null && second.equals(other.second)))
- && (third == other.third || (third != null && third.equals(other.third)))
- && (fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("Tuple4[");
- sb.append(first).append(",").append(second).append(",").append(third);
- return sb.append(",").append(fourth).append("]").toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TupleN.java b/crunch/src/main/java/org/apache/crunch/TupleN.java
deleted file mode 100644
index e5eceb5..0000000
--- a/crunch/src/main/java/org/apache/crunch/TupleN.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A {@link Tuple} instance for an arbitrary number of values.
- */
-public class TupleN implements Tuple {
-
- private final Object values[];
-
- public static TupleN of(Object... values) {
- return new TupleN(values);
- }
-
- public TupleN(Object... values) {
- this.values = new Object[values.length];
- System.arraycopy(values, 0, this.values, 0, values.length);
- }
-
- public Object get(int index) {
- return values[index];
- }
-
- public int size() {
- return values.length;
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder hcb = new HashCodeBuilder();
- for (Object v : values) {
- hcb.append(v);
- }
- return hcb.toHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TupleN other = (TupleN) obj;
- return Arrays.equals(this.values, other.values);
- }
-
- @Override
- public String toString() {
- return Arrays.toString(values);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
deleted file mode 100644
index 0ac79e2..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ /dev/null
@@ -1,1111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import java.math.BigInteger;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.crunch.Aggregator;
-import org.apache.crunch.CombineFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.util.Tuples;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-
-/**
- * A collection of pre-defined {@link org.apache.crunch.Aggregator}s.
- *
- * <p>The factory methods of this class return {@link org.apache.crunch.Aggregator}
- * instances that you can use to combine the values of a {@link PGroupedTable}.
- * In most cases, they turn a multimap (multiple entries per key) into a map (one
- * entry per key).</p>
- *
- * <p><strong>Note</strong>: When using composed aggregators, like those built by the
- * {@link #pairAggregator(Aggregator, Aggregator) pairAggregator()}
- * factory method, you typically don't want to put in the same child aggregator more than once,
- * even if all child aggregators have the same type. In most cases, this is what you want:</p>
- *
- * <pre>
- * PTable<K, Long> result = groupedTable.combineValues(
- * pairAggregator(SUM_LONGS(), SUM_LONGS())
- * );
- * </pre>
- */
-public final class Aggregators {
-
- private Aggregators() {
- // utility class, not for instantiation
- }
-
- /**
- * Sum up all {@code long} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Long> SUM_LONGS() {
- return new SumLongs();
- }
-
- /**
- * Sum up all {@code int} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Integer> SUM_INTS() {
- return new SumInts();
- }
-
- /**
- * Sum up all {@code float} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Float> SUM_FLOATS() {
- return new SumFloats();
- }
-
- /**
- * Sum up all {@code double} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Double> SUM_DOUBLES() {
- return new SumDoubles();
- }
-
- /**
- * Sum up all {@link BigInteger} values.
- * @return The newly constructed instance
- */
- public static Aggregator<BigInteger> SUM_BIGINTS() {
- return new SumBigInts();
- }
-
- /**
- * Return the maximum of all given {@code long} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Long> MAX_LONGS() {
- return new MaxLongs();
- }
-
- /**
- * Return the {@code n} largest {@code long} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Long> MAX_LONGS(int n) {
- return new MaxLongs();
- }
-
- /**
- * Return the maximum of all given {@code int} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Integer> MAX_INTS() {
- return new MaxInts();
- }
-
- /**
- * Return the {@code n} largest {@code int} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Integer> MAX_INTS(int n) {
- return new MaxNAggregator<Integer>(n);
- }
-
- /**
- * Return the maximum of all given {@code float} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Float> MAX_FLOATS() {
- return new MaxFloats();
- }
-
- /**
- * Return the {@code n} largest {@code float} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Float> MAX_FLOATS(int n) {
- return new MaxNAggregator<Float>(n);
- }
-
- /**
- * Return the maximum of all given {@code double} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Double> MAX_DOUBLES() {
- return new MaxDoubles();
- }
-
- /**
- * Return the {@code n} largest {@code double} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Double> MAX_DOUBLES(int n) {
- return new MaxNAggregator<Double>(n);
- }
-
- /**
- * Return the maximum of all given {@link BigInteger} values.
- * @return The newly constructed instance
- */
- public static Aggregator<BigInteger> MAX_BIGINTS() {
- return new MaxBigInts();
- }
-
- /**
- * Return the {@code n} largest {@link BigInteger} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
- return new MaxNAggregator<BigInteger>(n);
- }
-
- /**
- * Return the {@code n} largest values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
- * @return The newly constructed instance
- */
- public static <V extends Comparable<V>> Aggregator<V> MAX_N(int n, Class<V> cls) {
- return new MaxNAggregator<V>(n);
- }
-
- /**
- * Return the minimum of all given {@code long} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Long> MIN_LONGS() {
- return new MinLongs();
- }
-
- /**
- * Return the {@code n} smallest {@code long} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Long> MIN_LONGS(int n) {
- return new MinNAggregator<Long>(n);
- }
-
- /**
- * Return the minimum of all given {@code int} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Integer> MIN_INTS() {
- return new MinInts();
- }
-
- /**
- * Return the {@code n} smallest {@code int} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Integer> MIN_INTS(int n) {
- return new MinNAggregator<Integer>(n);
- }
-
- /**
- * Return the minimum of all given {@code float} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Float> MIN_FLOATS() {
- return new MinFloats();
- }
-
- /**
- * Return the {@code n} smallest {@code float} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Float> MIN_FLOATS(int n) {
- return new MinNAggregator<Float>(n);
- }
-
- /**
- * Return the minimum of all given {@code double} values.
- * @return The newly constructed instance
- */
- public static Aggregator<Double> MIN_DOUBLES() {
- return new MinDoubles();
- }
-
- /**
- * Return the {@code n} smallest {@code double} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<Double> MIN_DOUBLES(int n) {
- return new MinNAggregator<Double>(n);
- }
-
- /**
- * Return the minimum of all given {@link BigInteger} values.
- * @return The newly constructed instance
- */
- public static Aggregator<BigInteger> MIN_BIGINTS() {
- return new MinBigInts();
- }
-
- /**
- * Return the {@code n} smallest {@link BigInteger} values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
- return new MinNAggregator<BigInteger>(n);
- }
-
- /**
- * Return the {@code n} smallest values (or fewer if there are fewer
- * values than {@code n}).
- * @param n The number of values to return
- * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
- * @return The newly constructed instance
- */
- public static <V extends Comparable<V>> Aggregator<V> MIN_N(int n, Class<V> cls) {
- return new MinNAggregator<V>(n);
- }
-
- /**
- * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
- *
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static <V> Aggregator<V> FIRST_N(int n) {
- return new FirstNAggregator<V>(n);
- }
-
- /**
- * Return the last {@code n} values (or fewer if there are fewer values than {@code n}).
- *
- * @param n The number of values to return
- * @return The newly constructed instance
- */
- public static <V> Aggregator<V> LAST_N(int n) {
- return new LastNAggregator<V>(n);
- }
-
- /**
- * Concatenate strings, with a separator between strings. There
- * is no limits of length for the concatenated string.
- *
- * <p><em>Note: String concatenation is not commutative, which means the
- * result of the aggregation is not deterministic!</em></p>
- *
- * @param separator
- * the separator which will be appended between each string
- * @param skipNull
- * define if we should skip null values. Throw
- * NullPointerException if set to false and there is a null
- * value.
- * @return The newly constructed instance
- */
- public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull) {
- return new StringConcatAggregator(separator, skipNull);
- }
-
- /**
- * Concatenate strings, with a separator between strings. You can specify
- * the maximum length of the output string and of the input strings, if
- * they are > 0. If a value is <= 0, there is no limit.
- *
- * <p>Any too large string (or any string which would made the output too
- * large) will be silently discarded.</p>
- *
- * <p><em>Note: String concatenation is not commutative, which means the
- * result of the aggregation is not deterministic!</em></p>
- *
- * @param separator
- * the separator which will be appended between each string
- * @param skipNull
- * define if we should skip null values. Throw
- * NullPointerException if set to false and there is a null
- * value.
- * @param maxOutputLength
- * the maximum length of the output string. If it's set <= 0,
- * there is no limit. The number of characters of the output
- * string will be < maxOutputLength.
- * @param maxInputLength
- * the maximum length of the input strings. If it's set <= 0,
- * there is no limit. The number of characters of the input string
- * will be < maxInputLength to be concatenated.
- * @return The newly constructed instance
- */
- public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull,
- long maxOutputLength, long maxInputLength) {
- return new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength);
- }
-
- /**
- * Collect the unique elements of the input, as defined by the {@code equals} method for
- * the input objects. No guarantees are made about the order in which the final elements
- * will be returned.
- *
- * @return The newly constructed instance
- */
- public static <V> Aggregator<V> UNIQUE_ELEMENTS() {
- return new SetAggregator<V>();
- }
-
- /**
- * Collect a sample of unique elements from the input, where 'unique' is defined by
- * the {@code equals} method for the input objects. No guarantees are made about which
- * elements will be returned, simply that there will not be any more than the given sample
- * size for any key.
- *
- * @param maximumSampleSize The maximum number of unique elements to return per key
- * @return The newly constructed instance
- */
- public static <V> Aggregator<V> SAMPLE_UNIQUE_ELEMENTS(int maximumSampleSize) {
- return new SetAggregator<V>(maximumSampleSize);
- }
-
- /**
- * Apply separate aggregators to each component of a {@link Pair}.
- */
- public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator(
- Aggregator<V1> a1, Aggregator<V2> a2) {
- return new PairAggregator<V1, V2>(a1, a2);
- }
-
- /**
- * Apply separate aggregators to each component of a {@link Tuple3}.
- */
- public static <V1, V2, V3> Aggregator<Tuple3<V1, V2, V3>> tripAggregator(
- Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3) {
- return new TripAggregator<V1, V2, V3>(a1, a2, a3);
- }
-
- /**
- * Apply separate aggregators to each component of a {@link Tuple4}.
- */
- public static <V1, V2, V3, V4> Aggregator<Tuple4<V1, V2, V3, V4>> quadAggregator(
- Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3, Aggregator<V4> a4) {
- return new QuadAggregator<V1, V2, V3, V4>(a1, a2, a3, a4);
- }
-
- /**
- * Apply separate aggregators to each component of a {@link Tuple}.
- */
- public static Aggregator<TupleN> tupleAggregator(Aggregator<?>... aggregators) {
- return new TupleNAggregator(aggregators);
- }
-
- /**
- * Wrap a {@link CombineFn} adapter around the given aggregator.
- *
- * @param aggregator The instance to wrap
- * @return A {@link CombineFn} delegating to {@code aggregator}
- */
- public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator) {
- return new AggregatorCombineFn<K, V>(aggregator);
- }
-
- /**
- * Base class for aggregators that do not require any initialization.
- */
- public static abstract class SimpleAggregator<T> implements Aggregator<T> {
- @Override
- public void initialize(Configuration conf) {
- // No-op
- }
- }
-
- /**
- * A {@code CombineFn} that delegates all of the actual work to an
- * {@code Aggregator} instance.
- */
- private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
- // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed.
- private final org.apache.crunch.Aggregator<V> aggregator;
-
- public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) {
- this.aggregator = aggregator;
- }
-
- @Override
- public void initialize() {
- aggregator.initialize(getConfiguration());
- }
-
- @Override
- public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
- aggregator.reset();
- for (V v : input.second()) {
- aggregator.update(v);
- }
- for (V v : aggregator.results()) {
- emitter.emit(Pair.of(input.first(), v));
- }
- }
- }
-
- private static class SumLongs extends SimpleAggregator<Long> {
- private long sum = 0;
-
- @Override
- public void reset() {
- sum = 0;
- }
-
- @Override
- public void update(Long next) {
- sum += next;
- }
-
- @Override
- public Iterable<Long> results() {
- return ImmutableList.of(sum);
- }
- }
-
- private static class SumInts extends SimpleAggregator<Integer> {
- private int sum = 0;
-
- @Override
- public void reset() {
- sum = 0;
- }
-
- @Override
- public void update(Integer next) {
- sum += next;
- }
-
- @Override
- public Iterable<Integer> results() {
- return ImmutableList.of(sum);
- }
- }
-
- private static class SumFloats extends SimpleAggregator<Float> {
- private float sum = 0;
-
- @Override
- public void reset() {
- sum = 0f;
- }
-
- @Override
- public void update(Float next) {
- sum += next;
- }
-
- @Override
- public Iterable<Float> results() {
- return ImmutableList.of(sum);
- }
- }
-
- private static class SumDoubles extends SimpleAggregator<Double> {
- private double sum = 0;
-
- @Override
- public void reset() {
- sum = 0f;
- }
-
- @Override
- public void update(Double next) {
- sum += next;
- }
-
- @Override
- public Iterable<Double> results() {
- return ImmutableList.of(sum);
- }
- }
-
- private static class SumBigInts extends SimpleAggregator<BigInteger> {
- private BigInteger sum = BigInteger.ZERO;
-
- @Override
- public void reset() {
- sum = BigInteger.ZERO;
- }
-
- @Override
- public void update(BigInteger next) {
- sum = sum.add(next);
- }
-
- @Override
- public Iterable<BigInteger> results() {
- return ImmutableList.of(sum);
- }
- }
-
- private static class MaxLongs extends SimpleAggregator<Long> {
- private Long max = null;
-
- @Override
- public void reset() {
- max = null;
- }
-
- @Override
- public void update(Long next) {
- if (max == null || max < next) {
- max = next;
- }
- }
-
- @Override
- public Iterable<Long> results() {
- return ImmutableList.of(max);
- }
- }
-
- private static class MaxInts extends SimpleAggregator<Integer> {
- private Integer max = null;
-
- @Override
- public void reset() {
- max = null;
- }
-
- @Override
- public void update(Integer next) {
- if (max == null || max < next) {
- max = next;
- }
- }
-
- @Override
- public Iterable<Integer> results() {
- return ImmutableList.of(max);
- }
- }
-
- private static class MaxFloats extends SimpleAggregator<Float> {
- private Float max = null;
-
- @Override
- public void reset() {
- max = null;
- }
-
- @Override
- public void update(Float next) {
- if (max == null || max < next) {
- max = next;
- }
- }
-
- @Override
- public Iterable<Float> results() {
- return ImmutableList.of(max);
- }
- }
-
- private static class MaxDoubles extends SimpleAggregator<Double> {
- private Double max = null;
-
- @Override
- public void reset() {
- max = null;
- }
-
- @Override
- public void update(Double next) {
- if (max == null || max < next) {
- max = next;
- }
- }
-
- @Override
- public Iterable<Double> results() {
- return ImmutableList.of(max);
- }
- }
-
- private static class MaxBigInts extends SimpleAggregator<BigInteger> {
- private BigInteger max = null;
-
- @Override
- public void reset() {
- max = null;
- }
-
- @Override
- public void update(BigInteger next) {
- if (max == null || max.compareTo(next) < 0) {
- max = next;
- }
- }
-
- @Override
- public Iterable<BigInteger> results() {
- return ImmutableList.of(max);
- }
- }
-
- private static class MinLongs extends SimpleAggregator<Long> {
- private Long min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void update(Long next) {
- if (min == null || min > next) {
- min = next;
- }
- }
-
- @Override
- public Iterable<Long> results() {
- return ImmutableList.of(min);
- }
- }
-
- private static class MinInts extends SimpleAggregator<Integer> {
- private Integer min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void update(Integer next) {
- if (min == null || min > next) {
- min = next;
- }
- }
-
- @Override
- public Iterable<Integer> results() {
- return ImmutableList.of(min);
- }
- }
-
- private static class MinFloats extends SimpleAggregator<Float> {
- private Float min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void update(Float next) {
- if (min == null || min > next) {
- min = next;
- }
- }
-
- @Override
- public Iterable<Float> results() {
- return ImmutableList.of(min);
- }
- }
-
- private static class MinDoubles extends SimpleAggregator<Double> {
- private Double min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void update(Double next) {
- if (min == null || min > next) {
- min = next;
- }
- }
-
- @Override
- public Iterable<Double> results() {
- return ImmutableList.of(min);
- }
- }
-
- private static class MinBigInts extends SimpleAggregator<BigInteger> {
- private BigInteger min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void update(BigInteger next) {
- if (min == null || min.compareTo(next) > 0) {
- min = next;
- }
- }
-
- @Override
- public Iterable<BigInteger> results() {
- return ImmutableList.of(min);
- }
- }
-
- private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
- private final int arity;
- private transient SortedSet<V> elements;
-
- public MaxNAggregator(int arity) {
- this.arity = arity;
- }
-
- @Override
- public void reset() {
- if (elements == null) {
- elements = Sets.newTreeSet();
- } else {
- elements.clear();
- }
- }
-
- @Override
- public void update(V value) {
- if (elements.size() < arity) {
- elements.add(value);
- } else if (value.compareTo(elements.first()) > 0) {
- elements.remove(elements.first());
- elements.add(value);
- }
- }
-
- @Override
- public Iterable<V> results() {
- return ImmutableList.copyOf(elements);
- }
- }
-
- private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
- private final int arity;
- private transient SortedSet<V> elements;
-
- public MinNAggregator(int arity) {
- this.arity = arity;
- }
-
- @Override
- public void reset() {
- if (elements == null) {
- elements = Sets.newTreeSet();
- } else {
- elements.clear();
- }
- }
-
- @Override
- public void update(V value) {
- if (elements.size() < arity) {
- elements.add(value);
- } else if (value.compareTo(elements.last()) < 0) {
- elements.remove(elements.last());
- elements.add(value);
- }
- }
-
- @Override
- public Iterable<V> results() {
- return ImmutableList.copyOf(elements);
- }
- }
-
- private static class FirstNAggregator<V> extends SimpleAggregator<V> {
- private final int arity;
- private final List<V> elements;
-
- public FirstNAggregator(int arity) {
- this.arity = arity;
- this.elements = Lists.newArrayList();
- }
-
- @Override
- public void reset() {
- elements.clear();
- }
-
- @Override
- public void update(V value) {
- if (elements.size() < arity) {
- elements.add(value);
- }
- }
-
- @Override
- public Iterable<V> results() {
- return ImmutableList.copyOf(elements);
- }
- }
-
- private static class LastNAggregator<V> extends SimpleAggregator<V> {
- private final int arity;
- private final LinkedList<V> elements;
-
- public LastNAggregator(int arity) {
- this.arity = arity;
- this.elements = Lists.newLinkedList();
- }
-
- @Override
- public void reset() {
- elements.clear();
- }
-
- @Override
- public void update(V value) {
- elements.add(value);
- if (elements.size() == arity + 1) {
- elements.removeFirst();
- }
- }
-
- @Override
- public Iterable<V> results() {
- return ImmutableList.copyOf(elements);
- }
- }
-
- private static class StringConcatAggregator extends SimpleAggregator<String> {
- private final String separator;
- private final boolean skipNulls;
- private final long maxOutputLength;
- private final long maxInputLength;
- private long currentLength;
- private final LinkedList<String> list = new LinkedList<String>();
-
- private transient Joiner joiner;
-
- public StringConcatAggregator(final String separator, final boolean skipNulls) {
- this.separator = separator;
- this.skipNulls = skipNulls;
- this.maxInputLength = 0;
- this.maxOutputLength = 0;
- }
-
- public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
- this.separator = separator;
- this.skipNulls = skipNull;
- this.maxOutputLength = maxOutputLength;
- this.maxInputLength = maxInputLength;
- this.currentLength = -separator.length();
- }
-
- @Override
- public void reset() {
- if (joiner == null) {
- joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
- }
- currentLength = -separator.length();
- list.clear();
- }
-
- @Override
- public void update(final String next) {
- long length = (next == null) ? 0 : next.length() + separator.length();
- if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
- return;
- }
- if (maxOutputLength > 0) {
- currentLength += length;
- }
- list.add(next);
- }
-
- @Override
- public Iterable<String> results() {
- return ImmutableList.of(joiner.join(list));
- }
- }
-
-
- private static abstract class TupleAggregator<T> implements Aggregator<T> {
- private final List<Aggregator<Object>> aggregators;
-
- @SuppressWarnings("unchecked")
- public TupleAggregator(Aggregator<?>... aggregators) {
- this.aggregators = Lists.newArrayList();
- for (Aggregator<?> a : aggregators) {
- this.aggregators.add((Aggregator<Object>) a);
- }
- }
-
- @Override
- public void initialize(Configuration configuration) {
- for (Aggregator<?> a : aggregators) {
- a.initialize(configuration);
- }
- }
-
- @Override
- public void reset() {
- for (Aggregator<?> a : aggregators) {
- a.reset();
- }
- }
-
- protected void updateTuple(Tuple t) {
- for (int i = 0; i < aggregators.size(); i++) {
- aggregators.get(i).update(t.get(i));
- }
- }
-
- protected Iterable<Object> results(int index) {
- return aggregators.get(index).results();
- }
- }
-
- private static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
-
- public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
- super(a1, a2);
- }
-
- @Override
- public void update(Pair<V1, V2> value) {
- updateTuple(value);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Iterable<Pair<V1, V2>> results() {
- return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
- }
- }
-
- private static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
-
- public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
- super(a1, a2, a3);
- }
-
- @Override
- public void update(Tuple3<A, B, C> value) {
- updateTuple(value);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Iterable<Tuple3<A, B, C>> results() {
- return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
- (Iterable<C>) results(2));
- }
- }
-
- private static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
-
- public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
- super(a1, a2, a3, a4);
- }
-
- @Override
- public void update(Tuple4<A, B, C, D> value) {
- updateTuple(value);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Iterable<Tuple4<A, B, C, D>> results() {
- return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
- (Iterable<C>) results(2), (Iterable<D>) results(3));
- }
- }
-
- private static class TupleNAggregator extends TupleAggregator<TupleN> {
- private final int size;
-
- public TupleNAggregator(Aggregator<?>... aggregators) {
- super(aggregators);
- size = aggregators.length;
- }
-
- @Override
- public void update(TupleN value) {
- updateTuple(value);
- }
-
- @Override
- public Iterable<TupleN> results() {
- Iterable<?>[] iterables = new Iterable[size];
- for (int i = 0; i < size; i++) {
- iterables[i] = results(i);
- }
- return new Tuples.TupleNIterable(iterables);
- }
- }
-
- private static class SetAggregator<V> extends SimpleAggregator<V> {
- private final Set<V> elements;
- private final int sizeLimit;
-
- public SetAggregator() {
- this(-1);
- }
-
- public SetAggregator(int sizeLimit) {
- this.elements = Sets.newHashSet();
- this.sizeLimit = sizeLimit;
- }
-
- @Override
- public void reset() {
- elements.clear();
- }
-
- @Override
- public void update(V value) {
- if (sizeLimit == -1 || elements.size() < sizeLimit) {
- elements.add(value);
- }
- }
-
- @Override
- public Iterable<V> results() {
- return ImmutableList.copyOf(elements);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
deleted file mode 100644
index 2a8e7d9..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
-
- private final MapFn<R, S> first;
- private final MapFn<S, T> second;
-
- public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
- this.first = first;
- this.second = second;
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- first.setContext(context);
- second.setContext(context);
- }
-
- @Override
- public void initialize() {
- first.initialize();
- second.initialize();
- }
-
- public MapFn<R, S> getFirst() {
- return first;
- }
-
- public MapFn<S, T> getSecond() {
- return second;
- }
-
- @Override
- public T map(R input) {
- return second.map(first.map(input));
- }
-
- @Override
- public void cleanup(Emitter<T> emitter) {
- first.cleanup(null);
- second.cleanup(null);
- }
-
- @Override
- public void configure(Configuration conf) {
- first.configure(conf);
- second.configure(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
deleted file mode 100644
index b8cc9df..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * Wrapper function for converting a {@code MapFn} into a key-value pair that is
- * used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
- */
-public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
-
- private final MapFn<V, K> mapFn;
-
- public ExtractKeyFn(MapFn<V, K> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- mapFn.setContext(context);
- }
-
- @Override
- public void initialize() {
- mapFn.initialize();
- }
-
- @Override
- public Pair<K, V> map(V input) {
- return Pair.of(mapFn.map(input), input);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java b/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
deleted file mode 100644
index 8dc4268..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.FilterFn.AndFn;
-import org.apache.crunch.FilterFn.NotFn;
-import org.apache.crunch.FilterFn.OrFn;
-
-
-/**
- * A collection of pre-defined {@link FilterFn} implementations.
- */
-public final class FilterFns {
- // Note: We delegate to the deprecated implementation classes in FilterFn. When their
- // time is up, we just move them here.
-
- private FilterFns() {
- // utility class, not for instantiation
- }
-
- /**
- * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
- * @param fn1 The first functions to delegate to
- * @param fn2 The second functions to delegate to
- * @return The composed filter function
- */
- public static <S> FilterFn<S> and(FilterFn<S> fn1, FilterFn<S> fn2) {
- return new AndFn<S>(fn1, fn2);
- }
-
- /**
- * Accept an entry if all of the given filters accept it, using short-circuit evaluation.
- * @param fns The functions to delegate to (in the given order)
- * @return The composed filter function
- */
- public static <S> FilterFn<S> and(FilterFn<S>... fns) {
- return new AndFn<S>(fns);
- }
-
- /**
- * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
- * @param fn1 The first functions to delegate to
- * @param fn2 The second functions to delegate to
- * @return The composed filter function
- */
- public static <S> FilterFn<S> or(FilterFn<S> fn1, FilterFn<S> fn2) {
- return new OrFn<S>(fn1, fn2);
- }
-
- /**
- * Accept an entry if at least one of the given filters accept it, using short-circuit evaluation.
- * @param fns The functions to delegate to (in the given order)
- * @return The composed filter function
- */
- public static <S> FilterFn<S> or(FilterFn<S>... fns) {
- return new OrFn<S>(fns);
- }
-
- /**
- * Accept an entry if the given filter <em>does not</em> accept it.
- * @param fn The function to delegate to
- * @return The composed filter function
- */
- public static <S> FilterFn<S> not(FilterFn<S> fn) {
- return new NotFn<S>(fn);
- }
-
- /**
- * Accept everything.
- * @return A filter function that accepts everything.
- */
- public static <S> FilterFn<S> ACCEPT_ALL() {
- return new AcceptAllFn<S>();
- }
-
- /**
- * Reject everything.
- * @return A filter function that rejects everything.
- */
- public static <S> FilterFn<S> REJECT_ALL() {
- return not(new AcceptAllFn<S>());
- }
-
- private static class AcceptAllFn<S> extends FilterFn<S> {
- @Override
- public boolean accept(S input) {
- return true;
- }
-
- @Override
- public float scaleFactor() {
- return 1.0f;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java b/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
deleted file mode 100644
index 0eadb06..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.MapFn;
-
-public class IdentityFn<T> extends MapFn<T, T> {
-
- private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
-
- @SuppressWarnings("unchecked")
- public static <T> IdentityFn<T> getInstance() {
- return (IdentityFn<T>) INSTANCE;
- }
-
- // Non-instantiable
- private IdentityFn() {
- }
-
- @Override
- public T map(T input) {
- return input;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
deleted file mode 100644
index cbaf24d..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-
-public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
-
- @Override
- public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
- emitter.emit(Pair.of(map(input.first()), input.second()));
- }
-
- public abstract K2 map(K1 k1);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
deleted file mode 100644
index b90f5ff..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-
-public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
-
- @Override
- public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
- emitter.emit(Pair.of(input.first(), map(input.second())));
- }
-
- public abstract V2 map(V1 v);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
deleted file mode 100644
index 9ee4336..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
-
- private MapFn<K, S> keys;
- private MapFn<V, T> values;
-
- public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
- this.keys = keys;
- this.values = values;
- }
-
- @Override
- public void configure(Configuration conf) {
- keys.configure(conf);
- values.configure(conf);
- }
-
- @Override
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- keys.setContext(context);
- values.setContext(context);
- }
-
- @Override
- public void initialize() {
- keys.initialize();
- values.initialize();
- }
-
- @Override
- public Pair<S, T> map(Pair<K, V> input) {
- return Pair.of(keys.map(input.first()), values.map(input.second()));
- }
-
- @Override
- public void cleanup(Emitter<Pair<S, T>> emitter) {
- keys.cleanup(null);
- values.cleanup(null);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/package-info.java b/crunch/src/main/java/org/apache/crunch/fn/package-info.java
deleted file mode 100644
index acefdff..0000000
--- a/crunch/src/main/java/org/apache/crunch/fn/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Commonly used functions for manipulating collections.
- */
-package org.apache.crunch.fn;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
deleted file mode 100644
index 887c051..0000000
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.hadoop.mapreduce;
-
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-/**
- * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in
- * Hadoop 1.x.x and an interface in Hadoop 2.x.x.
- */
-@SuppressWarnings("unchecked")
-public class TaskAttemptContextFactory {
-
- private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
-
- private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
-
- public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
- return INSTANCE.createInternal(conf, taskAttemptId);
- }
-
- private Constructor<TaskAttemptContext> taskAttemptConstructor;
-
- private TaskAttemptContextFactory() {
- Class<TaskAttemptContext> implClass = TaskAttemptContext.class;
- if (implClass.isInterface()) {
- try {
- implClass = (Class<TaskAttemptContext>) Class.forName(
- "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
- } catch (ClassNotFoundException e) {
- LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
- }
- }
- try {
- this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
- } catch (Exception e) {
- LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
- }
- }
-
- private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
- try {
- return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
- } catch (Exception e) {
- LOG.error("Could not construct a TaskAttemptContext instance", e);
- return null;
- }
- }
-}