You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:09 UTC
[07/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
deleted file mode 100644
index a3d30d2..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an inner join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
- private transient K lastKey;
- private transient List<U> leftValues;
-
- public InnerJoinFn(PType<K> keyType, PType<U> leftValueType) {
- super(keyType, leftValueType);
- }
-
- /** {@inheritDoc} */
- @Override
- public void initialize() {
- super.initialize();
- lastKey = null;
- this.leftValues = Lists.newArrayList();
- }
-
- /** {@inheritDoc} */
- @Override
- public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
- if (!key.equals(lastKey)) {
- lastKey = keyType.getDetachedValue(key);
- leftValues.clear();
- }
- if (id == 0) { // from left
- for (Pair<U, V> pair : pairs) {
- if (pair.first() != null)
- leftValues.add(leftValueType.getDetachedValue(pair.first()));
- }
- } else { // from right
- for (Pair<U, V> pair : pairs) {
- for (U u : leftValues) {
- emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String getJoinType() {
- return "innerJoin";
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
deleted file mode 100644
index 99aea5a..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-/**
- * Represents a {@link org.apache.crunch.DoFn} for performing joins.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public abstract class JoinFn<K, U, V> extends
- DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
-
- protected PType<K> keyType;
- protected PType<U> leftValueType;
-
- /**
- * Instantiate with the PType of the value of the left side of the join (used for creating deep
- * copies of values).
- *
- * @param keyType The PType of the value used as the key of the join
- * @param leftValueType The PType of the value type of the left side of the join
- */
- public JoinFn(PType<K> keyType, PType<U> leftValueType) {
- this.keyType = keyType;
- this.leftValueType = leftValueType;
- }
-
- @Override
- public void initialize() {
- this.keyType.initialize(getConfiguration());
- this.leftValueType.initialize(getConfiguration());
- }
-
- /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
- public abstract String getJoinType();
-
- /**
- * Performs the actual joining.
- *
- * @param key The key for this grouping of values.
- * @param id The side that this group of values is from (0 -> left, 1 -> right).
- * @param pairs The group of values associated with this key and id pair.
- * @param emitter The emitter to send the output to.
- */
- public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
- Emitter<Pair<K, Pair<U, V>>> emitter);
-
- /**
- * Split up the input record to make coding a bit more manageable.
- *
- * @param input The input record.
- * @param emitter The emitter to send the output to.
- */
- @Override
- public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
- Emitter<Pair<K, Pair<U, V>>> emitter) {
- join(input.first().first(), input.first().second(), input.second(), emitter);
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
deleted file mode 100644
index 6efeccb..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * Utilities that are useful in joining multiple data sets via a MapReduce.
- *
- */
-public class JoinUtils {
-
- public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
- if (typeFamily == WritableTypeFamily.getInstance()) {
- return TupleWritablePartitioner.class;
- } else {
- return AvroIndexedRecordPartitioner.class;
- }
- }
-
- public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
- if (typeFamily == WritableTypeFamily.getInstance()) {
- return TupleWritableComparator.class;
- } else {
- return AvroPairGroupingComparator.class;
- }
- }
-
- public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
- @Override
- public int getPartition(TupleWritable key, Writable value, int numPartitions) {
- return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
- }
- }
-
- public static class TupleWritableComparator implements RawComparator<TupleWritable> {
-
- private DataInputBuffer buffer = new DataInputBuffer();
- private TupleWritable key1 = new TupleWritable();
- private TupleWritable key2 = new TupleWritable();
-
- @Override
- public int compare(TupleWritable o1, TupleWritable o2) {
- return ((WritableComparable) o1.get(0)).compareTo((WritableComparable) o2.get(0));
- }
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- try {
- buffer.reset(b1, s1, l1);
- key1.readFields(buffer);
-
- buffer.reset(b2, s2, l2);
- key2.readFields(buffer);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return compare(key1, key2);
- }
- }
-
- public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
- @Override
- public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
- IndexedRecord record = (IndexedRecord) key.datum();
- return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
- }
- }
-
- public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
- private Schema schema;
-
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- if (conf != null) {
- Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
- Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
- schema = keySchema.getFields().get(0).schema();
- }
- }
-
- @Override
- public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
- return ReflectData.get().compare(x.datum(), y.datum(), schema);
- }
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
deleted file mode 100644
index 731c496..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an left outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
- private transient int lastId;
- private transient K lastKey;
- private transient List<U> leftValues;
-
- public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
- super(keyType, leftValueType);
- }
-
- /** {@inheritDoc} */
- @Override
- public void initialize() {
- super.initialize();
- lastId = 1;
- lastKey = null;
- this.leftValues = Lists.newArrayList();
- }
-
- /** {@inheritDoc} */
- @Override
- public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
- if (!key.equals(lastKey)) {
- // Make sure that left side always gets emitted.
- if (0 == lastId) {
- for (U u : leftValues) {
- emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
- }
- }
- lastKey = keyType.getDetachedValue(key);
- leftValues.clear();
- }
- if (id == 0) {
- for (Pair<U, V> pair : pairs) {
- if (pair.first() != null)
- leftValues.add(leftValueType.getDetachedValue(pair.first()));
- }
- } else {
- for (Pair<U, V> pair : pairs) {
- for (U u : leftValues) {
- emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
- }
- }
- }
-
- lastId = id;
- }
-
- /** {@inheritDoc} */
- @Override
- public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
- if (0 == lastId) {
- for (U u : leftValues) {
- emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String getJoinType() {
- return "leftOuterJoin";
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
deleted file mode 100644
index 56476c1..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.io.IOException;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.util.DistCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Utility for doing map side joins on a common key between two {@link PTable}s.
- * <p>
- * A map side join is an optimized join which doesn't use a reducer; instead,
- * the right side of the join is loaded into memory and the join is performed in
- * a mapper. This style of join has the important implication that the output of
- * the join is not sorted, which is the case with a conventional (reducer-based)
- * join.
- * <p>
- * <b>Note:</b>This utility is only supported when running with a
- * {@link MRPipeline} as the pipeline.
- */
-public class MapsideJoin {
-
- /**
- * Join two tables using a map side join. The right-side table will be loaded
- * fully in memory, so this method should only be used if the right side
- * table's contents can fit in the memory allocated to mappers. The join
- * performed by this method is an inner join.
- *
- * @param left
- * The left-side table of the join
- * @param right
- * The right-side table of the join, whose contents will be fully
- * read into memory
- * @return A table keyed on the join key, containing pairs of joined values
- */
- public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
- PTypeFamily tf = left.getTypeFamily();
- Iterable<Pair<K, V>> iterable = right.materialize();
-
- if (iterable instanceof MaterializableIterable) {
- MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
- MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
- right.getPType());
- ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
- if (mi.isSourceTarget()) {
- optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
- }
- return left.parallelDo("mapjoin", mapJoinDoFn,
- tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
- optionsBuilder.build());
- } else { // in-memory pipeline
- return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
- tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
- }
- }
-
- static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
- private Multimap<K, V> joinMap;
-
- public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
- joinMap = HashMultimap.create();
- for (Pair<K, V> joinPair : iterable) {
- joinMap.put(joinPair.first(), joinPair.second());
- }
- }
-
- @Override
- public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
- K key = input.first();
- U value = input.second();
- for (V joinValue : joinMap.get(key)) {
- Pair<U, V> valuePair = Pair.of(value, joinValue);
- emitter.emit(Pair.of(key, valuePair));
- }
- }
- }
-
- static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
- private String inputPath;
- private PType<Pair<K, V>> ptype;
- private Multimap<K, V> joinMap;
-
- public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
- this.inputPath = inputPath;
- this.ptype = ptype;
- }
-
- private Path getCacheFilePath() {
- Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
- if (local == null) {
- throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
- }
- return local;
- }
-
- @Override
- public void configure(Configuration conf) {
- DistCache.addCacheFile(new Path(inputPath), conf);
- }
-
- @Override
- public void initialize() {
- super.initialize();
-
- ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
- getCacheFilePath());
- Iterable<Pair<K, V>> iterable = null;
- try {
- iterable = sourceTarget.read(getConfiguration());
- } catch (IOException e) {
- throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
- }
-
- joinMap = ArrayListMultimap.create();
- for (Pair<K, V> joinPair : iterable) {
- joinMap.put(joinPair.first(), joinPair.second());
- }
- }
-
- @Override
- public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
- K key = input.first();
- U value = input.second();
- for (V joinValue : joinMap.get(key)) {
- Pair<U, V> valuePair = Pair.of(value, joinValue);
- emitter.emit(Pair.of(key, valuePair));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
deleted file mode 100644
index 2789d40..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an right outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
- private transient K lastKey;
- private transient List<U> leftValues;
-
- public RightOuterJoinFn(PType<K> keyType, PType<U> leftValueType) {
- super(keyType, leftValueType);
- }
-
- /** {@inheritDoc} */
- @Override
- public void initialize() {
- super.initialize();
- lastKey = null;
- this.leftValues = Lists.newArrayList();
- }
-
- /** {@inheritDoc} */
- @Override
- public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
- if (!key.equals(lastKey)) {
- lastKey = keyType.getDetachedValue(key);
- leftValues.clear();
- }
- if (id == 0) {
- for (Pair<U, V> pair : pairs) {
- if (pair.first() != null)
- leftValues.add(leftValueType.getDetachedValue(pair.first()));
- }
- } else {
- for (Pair<U, V> pair : pairs) {
- // Make sure that right side gets emitted.
- if (leftValues.isEmpty()) {
- leftValues.add(null);
- }
-
- for (U u : leftValues) {
- emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String getJoinType() {
- return "rightOuterJoin";
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java b/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java
deleted file mode 100644
index f1ad9f1..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/join/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Inner and outer joins on collections.
- */
-package org.apache.crunch.lib.join;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/package-info.java b/crunch/src/main/java/org/apache/crunch/lib/package-info.java
deleted file mode 100644
index 2695787..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Joining, sorting, aggregating, and other commonly used functionality.
- */
-package org.apache.crunch.lib;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java b/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
deleted file mode 100644
index ae7f49a..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.sort;
-
-import java.util.Arrays;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.crunch.lib.Sort.ColumnOrder;
-import org.apache.crunch.lib.Sort.Order;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-
-/**
- * A collection of {@code RawComparator<T>} implementations that are used by Crunch's {@code Sort} library.
- */
-public class Comparators {
-
- public static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
-
- private RawComparator<T> comparator;
-
- @SuppressWarnings("unchecked")
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- if (conf != null) {
- JobConf jobConf = new JobConf(conf);
- comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
- }
- }
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
- return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
- }
-
- @Override
- public int compare(T o1, T o2) {
- return -comparator.compare(o1, o2);
- }
- }
-
- public static class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
-
- private Schema schema;
-
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- if (conf != null) {
- schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
- }
- }
-
- @Override
- public int compare(AvroKey<T> o1, AvroKey<T> o2) {
- return -ReflectData.get().compare(o1.datum(), o2.datum(), schema);
- }
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
- return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
- }
- }
-
- public static class TupleWritableComparator extends WritableComparator implements Configurable {
-
- private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
-
- private Configuration conf;
- private ColumnOrder[] columnOrders;
-
- public TupleWritableComparator() {
- super(TupleWritable.class, true);
- }
-
- public static void configureOrdering(Configuration conf, Order... orders) {
- conf.set(CRUNCH_ORDERING_PROPERTY,
- Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
- @Override
- public String apply(Order o) {
- return o.name();
- }
- })));
- }
-
- public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
- conf.set(CRUNCH_ORDERING_PROPERTY,
- Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
- @Override
- public String apply(ColumnOrder o) {
- return o.column() + ";" + o.order().name();
- }
- })));
- }
-
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- TupleWritable ta = (TupleWritable) a;
- TupleWritable tb = (TupleWritable) b;
- for (int index = 0; index < columnOrders.length; index++) {
- int order = 1;
- if (columnOrders[index].order() == Order.ASCENDING) {
- order = 1;
- } else if (columnOrders[index].order() == Order.DESCENDING) {
- order = -1;
- } else { // ignore
- continue;
- }
- if (!ta.has(index) && !tb.has(index)) {
- continue;
- } else if (ta.has(index) && !tb.has(index)) {
- return order;
- } else if (!ta.has(index) && tb.has(index)) {
- return -order;
- } else {
- Writable v1 = ta.get(index);
- Writable v2 = tb.get(index);
- if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
- if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
- int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
- if (cmp != 0) {
- return order * cmp;
- }
- } else {
- int cmp = v1.hashCode() - v2.hashCode();
- if (cmp != 0) {
- return order * cmp;
- }
- }
- }
- }
- }
- return 0; // ordering using specified cols found no differences
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- if (conf != null) {
- String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
- String[] columnOrderNames = ordering.split(",");
- columnOrders = new ColumnOrder[columnOrderNames.length];
- for (int i = 0; i < columnOrders.length; i++) {
- String[] split = columnOrderNames[i].split(";");
- int column = Integer.parseInt(split[0]);
- Order order = Order.valueOf(split[1]);
- columnOrders[i] = ColumnOrder.by(column, order);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java b/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
deleted file mode 100644
index be218f6..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.sort;
-
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.lib.Sort.ColumnOrder;
-import org.apache.crunch.lib.Sort.Order;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-
-import com.google.common.collect.Lists;
-
-/**
- * A set of {@code DoFn}s that are used by Crunch's {@code Sort} library.
- */
-public class SortFns {
-
- /**
- * Extracts a single indexed key from a {@code Tuple} instance.
- */
- public static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
- private final int index;
-
- public SingleKeyFn(int index) {
- this.index = index;
- }
-
- @Override
- public K map(V input) {
- return (K) input.get(index);
- }
- }
-
- /**
- * Extracts a composite key from a {@code Tuple} instance.
- */
- public static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
- private final int[] indices;
- private final TupleFactory tupleFactory;
-
- public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
- this.indices = indices;
- this.tupleFactory = tupleFactory;
- }
-
- @Override
- public K map(V input) {
- Object[] values = new Object[indices.length];
- for (int i = 0; i < indices.length; i++) {
- values[i] = input.get(indices[i]);
- }
- return (K) tupleFactory.makeTuple(values);
- }
- }
-
- /**
- * Pulls a composite set of keys from an Avro {@code GenericRecord} instance.
- */
- public static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
-
- private final int[] indices;
- private final String schemaJson;
- private transient Schema schema;
-
- public AvroGenericFn(int[] indices, Schema schema) {
- this.indices = indices;
- this.schemaJson = schema.toString();
- }
-
- @Override
- public void initialize() {
- this.schema = (new Schema.Parser()).parse(schemaJson);
- }
-
- @Override
- public GenericRecord map(V input) {
- GenericRecord rec = new GenericData.Record(schema);
- for (int i = 0; i < indices.length; i++) {
- rec.put(i, input.get(indices[i]));
- }
- return rec;
- }
- }
-
- /**
- * Constructs an Avro schema for the given {@code PType<S>} that respects the given column
- * orderings.
- */
- public static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
- // Guarantee each tuple schema has a globally unique name
- String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
- Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
- List<Schema.Field> fields = Lists.newArrayList();
- AvroType<S> parentAvroType = (AvroType<S>) ptype;
- Schema parentAvroSchema = parentAvroType.getSchema();
-
- for (int index = 0; index < orders.length; index++) {
- ColumnOrder columnOrder = orders[index];
- AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
- Schema fieldSchema = atype.getSchema();
- String fieldName = parentAvroSchema.getFields().get(index).name();
- // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
- // Text instances: making this consistent
- Schema.Field.Order order = columnOrder.order() == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
- Schema.Field.Order.ASCENDING;
- fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
- }
- schema.setFields(fields);
- return schema;
- }
-
- /**
- * Utility class for encapsulating key extraction logic and serialization information about
- * key extraction.
- */
- public static class KeyExtraction<V extends Tuple> {
-
- private PType<V> ptype;
- private final ColumnOrder[] columnOrder;
- private final int[] cols;
-
- private MapFn<V, Object> byFn;
- private PType<Object> keyPType;
-
- public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
- this.ptype = ptype;
- this.columnOrder = columnOrder;
- this.cols = new int[columnOrder.length];
- for (int i = 0; i < columnOrder.length; i++) {
- cols[i] = columnOrder[i].column() - 1;
- }
- init();
- }
-
- private void init() {
- List<PType> pt = ptype.getSubTypes();
- PTypeFamily ptf = ptype.getFamily();
- if (cols.length == 1) {
- byFn = new SingleKeyFn(cols[0]);
- keyPType = pt.get(cols[0]);
- } else {
- TupleFactory tf = null;
- switch (cols.length) {
- case 2:
- tf = TupleFactory.PAIR;
- keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
- break;
- case 3:
- tf = TupleFactory.TUPLE3;
- keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
- break;
- case 4:
- tf = TupleFactory.TUPLE4;
- keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
- break;
- default:
- PType[] pts = new PType[cols.length];
- for (int i = 0; i < pts.length; i++) {
- pts[i] = pt.get(cols[i]);
- }
- tf = TupleFactory.TUPLEN;
- keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
- }
-
- if (ptf == AvroTypeFamily.getInstance()) {
- Schema s = createOrderedTupleSchema(keyPType, columnOrder);
- keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
- byFn = new AvroGenericFn(cols, s);
- } else {
- byFn = new TupleKeyFn(cols, tf);
- }
- }
-
- }
-
- public MapFn<V, Object> getByFn() {
- return byFn;
- }
-
- public PType<Object> getKeyType() {
- return keyPType;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
deleted file mode 100644
index 94fbdbe..0000000
--- a/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.sort;
-
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.avro.AvroFileReaderFactory;
-import org.apache.crunch.io.seq.SeqFileReaderFactory;
-import org.apache.crunch.types.writable.WritableDeepCopier;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * A partition-aware {@code Partitioner} instance that can work with either Avro or Writable-formatted
- * keys.
- */
-public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Configurable {
-
- public static final String DEFAULT_PATH = "_partition.lst";
- public static final String PARTITIONER_PATH =
- "crunch.totalorderpartitioner.path";
-
- private Configuration conf;
- private Node<K> partitions;
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- try {
- this.conf = conf;
- String parts = getPartitionFile(conf);
- final Path partFile = new Path(parts);
- final FileSystem fs = (DEFAULT_PATH.equals(parts))
- ? FileSystem.getLocal(conf) // assume in DistributedCache
- : partFile.getFileSystem(conf);
-
- Job job = new Job(conf);
- Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
- RawComparator<K> comparator =
- (RawComparator<K>) job.getSortComparator();
- K[] splitPoints = readPartitions(fs, partFile, keyClass, conf, comparator);
- int numReduceTasks = job.getNumReduceTasks();
- if (splitPoints.length != numReduceTasks - 1) {
- throw new IOException("Wrong number of partitions in keyset");
- }
- partitions = new BinarySearchNode(splitPoints, comparator);
- } catch (IOException e) {
- throw new IllegalArgumentException("Can't read partitions file", e);
- }
- }
-
- @Override
- public int getPartition(K key, V value, int modulo) {
- return partitions.findPartition(key);
- }
-
- public static void setPartitionFile(Configuration conf, Path p) {
- conf.set(PARTITIONER_PATH, p.toString());
- }
-
- public static String getPartitionFile(Configuration conf) {
- return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
- }
-
- @SuppressWarnings("unchecked") // map output key class
- private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
- Configuration conf, final RawComparator<K> comparator) throws IOException {
- ArrayList<K> parts = new ArrayList<K>();
- String schema = conf.get("crunch.schema");
- if (schema != null) {
- Schema s = (new Schema.Parser()).parse(schema);
- AvroFileReaderFactory<K> a = new AvroFileReaderFactory<K>(s);
- Iterator<K> iter = CompositePathIterable.create(fs, p, a).iterator();
- while (iter.hasNext()) {
- parts.add((K) new AvroKey<K>(iter.next()));
- }
- } else {
- WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
- SeqFileReaderFactory<K> s = new SeqFileReaderFactory<K>(keyClass);
- Iterator<K> iter = CompositePathIterable.create(fs, p, s).iterator();
- while (iter.hasNext()) {
- parts.add((K) wdc.deepCopy((Writable) iter.next()));
- }
- }
- Collections.sort(parts, comparator);
- return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
- }
-
- /**
- * Interface to the partitioner to locate a key in the partition keyset.
- */
- interface Node<T> {
- /**
- * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
- * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
- */
- int findPartition(T key);
- }
-
- class BinarySearchNode implements Node<K> {
- private final K[] splitPoints;
- private final RawComparator<K> comparator;
- BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
- this.splitPoints = splitPoints;
- this.comparator = comparator;
- }
- public int findPartition(K key) {
- final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
- return (pos < 0) ? -pos : pos;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
deleted file mode 100644
index 2dcc64f..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.hadoop.fs.Path;
-
-public class MaterializableIterable<E> implements Iterable<E> {
-
- private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
-
- private final Pipeline pipeline;
- private final ReadableSource<E> source;
- private Iterable<E> materialized;
-
- public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
- this.pipeline = pipeline;
- this.source = source;
- this.materialized = null;
- }
-
- public ReadableSource<E> getSource() {
- return source;
- }
-
- public boolean isSourceTarget() {
- return (source instanceof SourceTarget);
- }
-
- public Path getPath() {
- if (source instanceof FileSourceImpl) {
- return ((FileSourceImpl) source).getPath();
- } else if (source instanceof PathTarget) {
- return ((PathTarget) source).getPath();
- }
- return null;
- }
-
- @Override
- public Iterator<E> iterator() {
- if (materialized == null) {
- pipeline.run();
- materialize();
- }
- return materialized.iterator();
- }
-
- public void materialize() {
- try {
- materialized = source.read(pipeline.getConfiguration());
- } catch (IOException e) {
- LOG.error("Could not materialize: " + source, e);
- throw new CrunchRuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
deleted file mode 100644
index 69082e2..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize;
-
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.crunch.Pair;
-
-public class MaterializableMap<K, V> extends AbstractMap<K, V> {
-
- private Iterable<Pair<K, V>> iterable;
- private Set<Map.Entry<K, V>> entrySet;
-
- public MaterializableMap(Iterable<Pair<K, V>> iterable) {
- this.iterable = iterable;
- }
-
- private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
- HashMap<K, V> m = new HashMap<K, V>();
- for (Pair<K, V> x : xs)
- m.put(x.first(), x.second());
- return m.entrySet();
- }
-
- @Override
- public Set<Map.Entry<K, V>> entrySet() {
- if (entrySet == null)
- entrySet = toMapEntries(iterable);
- return entrySet;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
deleted file mode 100644
index 60e64b1..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/CollectionPObject.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-
-/**
- * A concrete implementation of {@link org.apache.crunch.materialize.pobject.PObjectImpl} whose
- * value is a Java {@link java.util.Collection} containing the elements of the underlying {@link
- * PCollection} for this {@link org.apache.crunch.PObject}.
- *
- * @param <S> The value type for elements contained in the {@code Collection} value encapsulated
- * by this {@code PObject}.
- */
-public class CollectionPObject<S> extends PObjectImpl<S, Collection<S>> {
-
- /**
- * Constructs a new instance of this {@code PObject} implementation.
- *
- * @param collect The backing {@code PCollection} for this {@code PObject}.
- */
- public CollectionPObject(PCollection<S> collect) {
- super(collect);
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<S> process(Iterable<S> input) {
- Collection<S> target = new ArrayList<S>();
- Iterator<S> itr = input.iterator();
- while (itr.hasNext()) {
- target.add(itr.next());
- }
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
deleted file mode 100644
index aa5fd9e..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/FirstElementPObject.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import java.util.Iterator;
-
-import org.apache.crunch.PCollection;
-
-/**
- * A concrete implementation of {@link PObjectImpl} that uses the first element in the backing
- * {@link PCollection} as the {@link org.apache.crunch.PObject} value.
- *
- * @param <T> The value type of this {@code PObject}.
- */
-public class FirstElementPObject<T> extends PObjectImpl<T, T> {
-
- /**
- * Constructs a new instance of this {@code PObject} implementation.
- *
- * @param collect The backing {@code PCollection} for this {@code PObject}.
- */
- public FirstElementPObject(PCollection<T> collect) {
- super(collect);
- }
-
- /** {@inheritDoc} */
- @Override
- public T process(Iterable<T> input) {
- Iterator<T> itr = input.iterator();
- if (itr.hasNext()) {
- return itr.next();
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
deleted file mode 100644
index 243997f..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/MapPObject.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pair;
-
-/**
- * A concrete implementation of {@link PObjectImpl} whose
- * value is a Java {@link Map}. The underlying {@link PCollection} for this
- * {@link org.apache.crunch.PObject} must contain {@link Pair}s of values. The
- * first element of the pair will be used as the map key, while the second element will be used
- * as the map value. Note that the contents of the underlying {@code PCollection} may not be
- * reflected in the returned {@code Map}, since a single key may be mapped to several values in
- * the underlying {@code PCollection}, and only one of those values will appear in the {@code
- * Map} encapsulated by this {@code PObject}.
- *
- * @param <K> The type of keys for the Map.
- * @param <V> The type of values for the Map.
- */
-public class MapPObject<K, V> extends PObjectImpl<Pair<K, V>, Map<K, V>> {
-
- /**
- * Constructs a new instance of this {@code PObject} implementation.
- *
- * @param collect The backing {@code PCollection} for this {@code PObject}.
- */
- public MapPObject(PCollection<Pair<K, V>> collect) {
- super(collect);
- }
-
- /** {@inheritDoc} */
- @Override
- public Map<K, V> process(Iterable<Pair<K, V>> input) {
- Map<K, V> target = new HashMap<K, V>();
- Iterator<Pair<K, V>> itr = input.iterator();
- while (itr.hasNext()) {
- Pair<K, V> pair = itr.next();
- target.put(pair.first(), pair.second());
- }
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java b/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
deleted file mode 100644
index 59c2ba2..0000000
--- a/crunch/src/main/java/org/apache/crunch/materialize/pobject/PObjectImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize.pobject;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Target;
-
-/**
- * An abstract implementation of {@link PObject} that is backed by a {@link PCollection}.
- * Clients creating a concrete implementation should override the method
- * {@link PObjectImpl#process(Iterable)}, which transforms the backing PCollection into the
- * singleton value encapsulated by the PObject. Once this {code PObject}'s value has been
- * calculated, the value is cached to prevent subsequent materializations of the backing
- * {@code PCollection}.
- *
- * @param <S> The type contained in the underlying PCollection.
- * @param <T> The type encapsulated by this PObject.
- */
-public abstract class PObjectImpl<S, T> implements PObject<T> {
-
- // The underlying PCollection whose contents will be used to generate the value for this
- // PObject.
- private PCollection<S> collection;
-
- // A variable to hold a cached copy of the value of this {@code PObject},
- // to prevent unnecessary materializations of the backing {@code PCollection}.
- private T cachedValue;
-
- // A flag indicating if a value for this {@code PObject} has been cached.
- private boolean isCached;
-
- /**
- * Constructs a new instance of this {@code PObject} implementation.
- *
- * @param collect The backing {@code PCollection} for this {@code PObject}.
- */
- public PObjectImpl(PCollection<S> collect) {
- this.collection = collect;
- this.cachedValue = null;
- this.isCached = false;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return collection.toString();
- }
-
- /** {@inheritDoc} */
- @Override
- public final T getValue() {
- if (!isCached) {
- cachedValue = process(collection.materialize());
- isCached = true;
- }
- return cachedValue;
- }
-
- /**
- * Transforms the provided Iterable, obtained from the backing {@link PCollection},
- * into the value encapsulated by this {@code PObject}.
- *
- * @param input An Iterable whose elements correspond to those of the backing {@code
- * PCollection}.
- * @return The value of this {@code PObject}.
- */
- protected abstract T process(Iterable<S> input);
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/package-info.java b/crunch/src/main/java/org/apache/crunch/package-info.java
deleted file mode 100644
index 38f11bc..0000000
--- a/crunch/src/main/java/org/apache/crunch/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Client-facing API and core abstractions.
- *
- * @see <a href="http://crunch.apache.org/intro.html">Introduction to
- * Apache Crunch</a>
- */
-package org.apache.crunch;
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
deleted file mode 100644
index 151ab82..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-/**
- * Performs deep copies (based on underlying PType deep copying) of Collections.
- *
- * @param <T> The type of Tuple implementation being copied
- */
-public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
-
- private PType<T> elementType;
-
- public CollectionDeepCopier(PType<T> elementType) {
- this.elementType = elementType;
- }
-
- @Override
- public void initialize(Configuration conf) {
- this.elementType.initialize(conf);
- }
-
- @Override
- public Collection<T> deepCopy(Collection<T> source) {
- if (source == null) {
- return null;
- }
- List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
- for (T value : source) {
- copiedCollection.add(elementType.getDetachedValue(value));
- }
- return copiedCollection;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/Converter.java b/crunch/src/main/java/org/apache/crunch/types/Converter.java
deleted file mode 100644
index a0dbb16..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/Converter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-
-import org.apache.crunch.DoFn;
-
-/**
- * Converts the input key/value from a MapReduce task into the input to a
- * {@link DoFn}, or takes the output of a {@code DoFn} and write it to the
- * output key/values.
- */
-public interface Converter<K, V, S, T> extends Serializable {
- S convertInput(K key, V value);
-
- T convertIterableInput(K key, Iterable<V> value);
-
- K outputKey(S value);
-
- V outputValue(S value);
-
- Class<K> getKeyClass();
-
- Class<V> getValueClass();
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
deleted file mode 100644
index f146e86..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Performs deep copies of values.
- *
- * @param <T> The type of value that will be copied
- */
-public interface DeepCopier<T> extends Serializable {
-
- /**
- * Initialize the deep copier with a job-specific configuration
- *
- * @param conf Job-specific configuration
- */
- void initialize(Configuration conf);
-
- /**
- * Create a deep copy of a value.
- *
- * @param source The value to be copied
- * @return The deep copy of the value
- */
- T deepCopy(T source);
-
- static class NoOpDeepCopier<V> implements DeepCopier<V> {
-
- @Override
- public V deepCopy(V source) {
- return source;
- }
-
- @Override
- public void initialize(Configuration conf) {
- // No initialization needed
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
deleted file mode 100644
index de8903b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Maps;
-
-public class MapDeepCopier<T> implements DeepCopier<Map<String, T>> {
-
- private final PType<T> ptype;
-
- public MapDeepCopier(PType<T> ptype) {
- this.ptype = ptype;
- }
-
- @Override
- public void initialize(Configuration conf) {
- this.ptype.initialize(conf);
- }
-
- @Override
- public Map<String, T> deepCopy(Map<String, T> source) {
- if (source == null) {
- return null;
- }
-
- Map<String, T> deepCopyMap = Maps.newHashMap();
- for (Entry<String, T> entry : source.entrySet()) {
- deepCopyMap.put(entry.getKey(), ptype.getDetachedValue(entry.getValue()));
- }
- return deepCopyMap;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
deleted file mode 100644
index d276cd6..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import com.google.common.collect.Iterables;
-
-/**
- * The {@code PType} instance for {@link PGroupedTable} instances. Its settings
- * are derived from the {@code PTableType} that was grouped to create the
- * {@code PGroupedTable} instance.
- *
- */
-public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
-
- protected static class PTypeIterable<V> implements Iterable<V> {
- private final Iterable<Object> iterable;
- private final MapFn<Object, V> mapFn;
-
- public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
- this.mapFn = mapFn;
- this.iterable = iterable;
- }
-
- public Iterator<V> iterator() {
- return new Iterator<V>() {
- Iterator<Object> iter = iterable.iterator();
-
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- public V next() {
- return mapFn.map(iter.next());
- }
-
- public void remove() {
- iter.remove();
- }
- };
- }
-
- @Override
- public String toString() {
- return Iterables.toString(this);
- }
- }
-
- public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
- private final MapFn<Object, K> keys;
- private final MapFn<Object, V> values;
-
- public PairIterableMapFn(MapFn<Object, K> keys, MapFn<Object, V> values) {
- this.keys = keys;
- this.values = values;
- }
-
- @Override
- public void configure(Configuration conf) {
- keys.configure(conf);
- values.configure(conf);
- }
-
- public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
- keys.setContext(context);
- values.setContext(context);
- }
-
- @Override
- public void initialize() {
- keys.initialize();
- values.initialize();
- }
-
- @Override
- public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
- return Pair.<K, Iterable<V>> of(keys.map(input.first()), new PTypeIterable(values, input.second()));
- }
- }
-
- protected final PTableType<K, V> tableType;
-
- public PGroupedTableType(PTableType<K, V> tableType) {
- this.tableType = tableType;
- }
-
- public PTableType<K, V> getTableType() {
- return tableType;
- }
-
- @Override
- public PTypeFamily getFamily() {
- return tableType.getFamily();
- }
-
- @Override
- public List<PType> getSubTypes() {
- return tableType.getSubTypes();
- }
-
- @Override
- public Converter getConverter() {
- return tableType.getConverter();
- }
-
- public abstract Converter getGroupingConverter();
-
- public abstract void configureShuffle(Job job, GroupingOptions options);
-
- @Override
- public ReadableSourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
- throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTableType.java b/crunch/src/main/java/org/apache/crunch/types/PTableType.java
deleted file mode 100644
index 3d06f8b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTableType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-
-/**
- * An extension of {@code PType} specifically for {@link PTable} objects. It
- * allows separate access to the {@code PType}s of the key and value for the
- * {@code PTable}.
- *
- */
-public interface PTableType<K, V> extends PType<Pair<K, V>> {
- /**
- * Returns the key type for the table.
- */
- PType<K> getKeyType();
-
- /**
- * Returns the value type for the table.
- */
- PType<V> getValueType();
-
- /**
- * Returns the grouped table version of this type.
- */
- PGroupedTableType<K, V> getGroupedTableType();
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java
deleted file mode 100644
index ebddf84..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@code PType} defines a mapping between a data type that is used in a Crunch pipeline and a
- * serialization and storage format that is used to read/write data from/to HDFS. Every
- * {@link PCollection} has an associated {@code PType} that tells Crunch how to read/write data from
- * that {@code PCollection}.
- *
- */
-public interface PType<T> extends Serializable {
- /**
- * Returns the Java type represented by this {@code PType}.
- */
- Class<T> getTypeClass();
-
- /**
- * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
- */
- PTypeFamily getFamily();
-
- MapFn<Object, T> getInputMapFn();
-
- MapFn<T, Object> getOutputMapFn();
-
- Converter getConverter();
-
- /**
- * Initialize this PType for use within a DoFn. This generally only needs to be called when using
- * a PType for {@link #getDetachedValue(Object)}.
- *
- * @param conf Configuration object
- * @see PType#getDetachedValue(Object)
- */
- void initialize(Configuration conf);
-
- /**
- * Returns a copy of a value (or the value itself) that can safely be retained.
- * <p>
- * This is useful when iterable values being processed in a DoFn (via a reducer) need to be held
- * on to for more than the scope of a single iteration, as a reducer (and therefore also a DoFn
- * that has an Iterable as input) re-use deserialized values. More information on object reuse is
- * available in the {@link DoFn} class documentation.
- *
- * @param value The value to be deep-copied
- * @return A deep copy of the input value
- */
- T getDetachedValue(T value);
-
- /**
- * Returns a {@code SourceTarget} that is able to read/write data using the serialization format
- * specified by this {@code PType}.
- */
- ReadableSourceTarget<T> getDefaultFileSource(Path path);
-
- /**
- * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
- */
- List<PType> getSubTypes();
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
deleted file mode 100644
index 9458f14..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-/**
- * An abstract factory for creating {@code PType} instances that have the same
- * serialization/storage backing format.
- *
- */
-public interface PTypeFamily {
- PType<Void> nulls();
-
- PType<String> strings();
-
- PType<Long> longs();
-
- PType<Integer> ints();
-
- PType<Float> floats();
-
- PType<Double> doubles();
-
- PType<Boolean> booleans();
-
- PType<ByteBuffer> bytes();
-
- <T> PType<T> records(Class<T> clazz);
-
- <T> PType<Collection<T>> collections(PType<T> ptype);
-
- <T> PType<Map<String, T>> maps(PType<T> ptype);
-
- <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
-
- <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3);
-
- <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4);
-
- PType<TupleN> tuples(PType<?>... ptypes);
-
- <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
-
- <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
-
- <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
-
- /**
- * Returns the equivalent of the given ptype for this family, if it exists.
- */
- <T> PType<T> as(PType<T> ptype);
-}