You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/05/20 18:38:23 UTC
crunch git commit: CRUNCH-520: Coverity scan inspection fixes
Repository: crunch
Updated Branches:
refs/heads/master 61f98eea9 -> d0bb205ea
CRUNCH-520: Coverity scan inspection fixes
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d0bb205e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d0bb205e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d0bb205e
Branch: refs/heads/master
Commit: d0bb205eaf236bf1d81b9070a2db3428cb90a161
Parents: 61f98ee
Author: Sean Owen <so...@cloudera.com>
Authored: Wed May 20 09:13:24 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Wed May 20 09:31:30 2015 -0700
----------------------------------------------------------------------
.../contrib/bloomfilter/BloomFiltersIT.java | 7 +++--
.../java/org/apache/crunch/ConfigurationIT.java | 2 +-
.../it/java/org/apache/crunch/MapPObjectIT.java | 6 ++--
.../java/org/apache/crunch/MaterializeIT.java | 6 ++--
.../org/apache/crunch/PTableKeyValueIT.java | 3 +-
.../org/apache/crunch/PipelineCallableIT.java | 4 +--
.../crunch/SingleUseIterableExceptionIT.java | 4 +--
.../apache/crunch/StageResultsCountersIT.java | 2 +-
.../crunch/impl/mem/MemPipelineUTF8IT.java | 6 +---
.../java/org/apache/crunch/io/ToolRunnerIT.java | 4 ++-
.../java/org/apache/crunch/lib/AggregateIT.java | 5 +++
.../crunch/lib/join/MapsideJoinStrategyIT.java | 5 +--
.../java/org/apache/crunch/fn/Aggregators.java | 3 +-
.../lib/jobcontrol/CrunchJobControl.java | 32 ++++++++++++--------
.../apache/crunch/impl/mem/CountersWrapper.java | 3 +-
.../org/apache/crunch/io/CrunchOutputs.java | 9 ++----
.../java/org/apache/crunch/lib/Quantiles.java | 4 +--
.../crunch/types/writable/TupleWritable.java | 6 ++--
.../writable/WritableGroupedTableType.java | 2 +-
.../org/apache/crunch/util/PartitionUtils.java | 2 ++
.../lib/jobcontrol/CrunchJobControlTest.java | 2 +-
.../crunch/impl/SingleUseIterableTest.java | 8 ++---
.../writable/GenericArrayWritableTest.java | 17 +++++++----
.../crunch/types/writable/WritablesTest.java | 5 +++
.../crunch/examples/WordAggregationHBase.java | 16 ++++++----
.../apache/crunch/io/hbase/HFileTargetIT.java | 8 +++--
.../crunch/io/hbase/WordCountHBaseIT.java | 24 +++++++++------
.../org/apache/crunch/io/hbase/HBaseData.java | 5 +--
.../org/apache/crunch/io/orc/OrcWritable.java | 5 +++
.../apache/crunch/SparkPipelineCallableIT.java | 4 +--
.../apache/crunch/impl/spark/SparkRuntime.java | 4 +--
31 files changed, 121 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
index d91e07f..c18c8c4 100644
--- a/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
+++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -46,7 +47,7 @@ public class BloomFiltersIT extends CrunchTestSupport implements Serializable {
List<String> parts = Arrays.asList(StringUtils.split(input, " "));
Collection<Key> keys = new HashSet<Key>();
for (String stringpart : parts) {
- keys.add(new Key(stringpart.getBytes()));
+ keys.add(new Key(stringpart.getBytes(Charset.forName("UTF-8"))));
}
return keys;
}
@@ -54,8 +55,8 @@ public class BloomFiltersIT extends CrunchTestSupport implements Serializable {
Map<String, BloomFilter> filterValues = BloomFilterFactory.createFilter(new Path(inputPath), filterFn).getValue();
assertEquals(1, filterValues.size());
BloomFilter filter = filterValues.get("shakes.txt");
- assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes())));
- assertTrue(filter.membershipTest(new Key("apples".getBytes())));
+ assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes(Charset.forName("UTF-8")))));
+ assertTrue(filter.membershipTest(new Key("apples".getBytes(Charset.forName("UTF-8")))));
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
index 0f65d8f..52ba2b6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
@@ -19,13 +19,13 @@
*/
package org.apache.crunch;
-import junit.framework.Assert;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java b/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
index c48284f..635efdf 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
@@ -17,7 +17,7 @@
*/
package org.apache.crunch;
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Map;
@@ -39,8 +39,8 @@ public class MapPObjectIT {
Pair.of(2, "c"), Pair.of(3, "e"));
public void assertMatches(Map<Integer, String> m) {
- for (Integer k : m.keySet()) {
- assertEquals(kvPairs.get(k).second(), m.get(k));
+ for (Map.Entry<Integer, String> e : m.entrySet()) {
+ assertEquals(kvPairs.get(e.getKey()).second(), e.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
index 7bc61df..455b943 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -17,9 +17,6 @@
*/
package org.apache.crunch;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
import java.io.IOException;
import java.util.List;
@@ -42,6 +39,9 @@ import org.junit.Test;
import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class MaterializeIT {
@Rule
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
index d56e122..a8a387b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
@@ -23,8 +23,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import junit.framework.Assert;
-
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.test.TemporaryPath;
@@ -32,6 +30,7 @@ import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
index b4fc19e..95638a1 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
@@ -26,8 +26,8 @@ import org.junit.Test;
import java.util.Map;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class PipelineCallableIT {
@Rule
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
index ff2897b..8d070cd 100644
--- a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
@@ -17,8 +17,6 @@
*/
package org.apache.crunch;
-import java.util.Iterator;
-
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.To;
import org.apache.crunch.test.TemporaryPath;
@@ -35,7 +33,7 @@ public class SingleUseIterableExceptionIT {
static class ReduceFn extends MapFn<Iterable<String>, String> {
@Override
public String map(Iterable<String> input) {
- Iterator<String> iter = input.iterator();
+ input.iterator();
throw new CrunchRuntimeException("Exception");
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
index 04711e4..e74c166 100644
--- a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
@@ -17,9 +17,9 @@
*/
package org.apache.crunch;
-import static junit.framework.Assert.assertTrue;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
index 56b167a..3e74cdd 100644
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
@@ -18,15 +18,10 @@
package org.apache.crunch.impl.mem;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-import junit.framework.Assert;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
@@ -36,6 +31,7 @@ import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.text.TextFileTarget;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
index 287ba93..57bb5fa 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
@@ -17,6 +17,8 @@
*/
package org.apache.crunch.io;
+import java.nio.charset.Charset;
+
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pair;
@@ -205,7 +207,7 @@ public class ToolRunnerIT {
@Override
public Pair<BytesWritable, BytesWritable> map(String input) {
- BytesWritable bw = new BytesWritable(input.getBytes());
+ BytesWritable bw = new BytesWritable(input.getBytes(Charset.forName("UTF-8")));
return Pair.of(bw, bw);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 1408c73..5675de8 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -248,5 +248,10 @@ public class AggregateIT {
return true;
}
+ @Override
+ public int hashCode() {
+ return value == null ? 0 : value.hashCode();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index f9caa3a..1917038 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -159,7 +160,7 @@ public class MapsideJoinStrategyIT {
public void testLegacyMapsideJoin_LeftSideIsEmpty() throws IOException {
MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
- PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+ readTable(pipeline, "orders.txt");
PTable<Integer, String> filteredCustomerTable = customerTable
.parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), customerTable.getPTableType());
@@ -235,7 +236,7 @@ public class MapsideJoinStrategyIT {
OutputStream out2 = fs.create(path2, true);
for(int i = 0; i < 4; i++){
- byte[] value = ("value" + i + "\n").getBytes();
+ byte[] value = ("value" + i + "\n").getBytes(Charset.forName("UTF-8"));
out1.write(value);
out2.write(value);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index 5a9c157..62ee089 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -1056,7 +1056,8 @@ public final class Aggregators {
@Override
public void update(final String next) {
long length = (next == null) ? 0 : next.length() + separator.length();
- if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
+ if ((maxOutputLength > 0 && currentLength + length > maxOutputLength) ||
+ (maxInputLength > 0 && next != null && next.length() > maxInputLength)) {
return;
}
if (maxOutputLength > 0) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index aac6296..62147ad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -156,17 +155,26 @@ public class CrunchJobControl {
}
private Map<Integer, CrunchControlledJob> getQueue(State state) {
- Map<Integer, CrunchControlledJob> retv = null;
- if (state == State.WAITING) {
- retv = this.waitingJobs;
- } else if (state == State.READY) {
- retv = this.readyJobs;
- } else if (state == State.RUNNING) {
- retv = this.runningJobs;
- } else if (state == State.SUCCESS) {
- retv = this.successfulJobs;
- } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
- retv = this.failedJobs;
+ Map<Integer, CrunchControlledJob> retv;
+ switch (state) {
+ case WAITING:
+ retv = this.waitingJobs;
+ break;
+ case READY:
+ retv = this.readyJobs;
+ break;
+ case RUNNING:
+ retv = this.runningJobs;
+ break;
+ case SUCCESS:
+ retv = this.successfulJobs;
+ break;
+ case FAILED:
+ case DEPENDENT_FAILED:
+ retv = this.failedJobs;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown state " + state);
}
return retv;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
index ee0906b..7312402 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
-import javax.annotation.Nullable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -87,7 +86,7 @@ class CountersWrapper extends Counters {
public Iterator<CounterGroup> iterator() {
return Iterators.concat(Iterables.transform(allCounters, new Function<Counters, Iterator<CounterGroup>>() {
@Override
- public Iterator<CounterGroup> apply(@Nullable Counters input) {
+ public Iterator<CounterGroup> apply(Counters input) {
return input.iterator();
}
}).iterator());
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 0d06931..247ac08 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -195,13 +195,8 @@ public class CrunchOutputs<K, V> {
job = getJob(job.getJobID(), namedOutput,baseConf);
OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
- TaskAttemptContext taskContext = null;
- RecordWriter<K, V> recordWriter = null;
-
- if (baseContext != null) {
- taskContext = getTaskContext(baseContext, job);
- recordWriter = fmt.getRecordWriter(taskContext);
- }
+ TaskAttemptContext taskContext = getTaskContext(baseContext, job);
+ RecordWriter<K, V> recordWriter = fmt.getRecordWriter(taskContext);
OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
this.outputStates.put(namedOutput, outputState);
return outputState;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
index d6fc454..4262c58 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
@@ -30,8 +30,6 @@ import org.apache.crunch.Pair;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
-import javax.annotation.Nullable;
-
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -173,7 +171,7 @@ public class Quantiles {
Iterator<V> valueIterator = Iterators.transform(iterator, new Function<Pair<V, Long>, V>() {
@Override
- public V apply(@Nullable Pair<V, Long> input) {
+ public V apply(Pair<V, Long> input) {
return input.first();
}
});
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index bdd3ad9..068b0af 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -334,8 +334,10 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
int cmp = WritableComparator.get(clazz.asSubclass(WritableComparable.class)).compare(
buffer1.getData(), buffer1.getPosition(), bodySize1,
buffer2.getData(), buffer2.getPosition(), bodySize2);
- buffer1.skip(bodySize1);
- buffer2.skip(bodySize2);
+ long skipped1 = buffer1.skip(bodySize1);
+ long skipped2 = buffer2.skip(bodySize2);
+ Preconditions.checkState(skipped1 == bodySize1);
+ Preconditions.checkState(skipped2 == bodySize2);
return cmp;
} else {
// fallback to deserialization
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
index c25345b..c251905 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -94,7 +94,7 @@ class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
WritableType valueType = (WritableType) tableType.getValueType();
job.setMapOutputKeyClass(keyType.getSerializationClass());
job.setMapOutputValueClass(valueType.getSerializationClass());
- if (options.getSortComparatorClass() == null &&
+ if ((options == null || options.getSortComparatorClass() == null) &&
TupleWritable.class.equals(keyType.getSerializationClass())) {
job.setSortComparatorClass(TupleWritable.Comparator.class);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
index cdcc401..fbd4ebd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.util;
+import com.google.common.base.Preconditions;
import org.apache.crunch.PCollection;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +43,7 @@ public class PartitionUtils {
public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) {
long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK);
+ Preconditions.checkArgument(bytesPerTask > 0);
int recommended = 1 + (int) (pcollection.getSize() / bytesPerTask);
int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS);
if (maxRecommended > 0 && recommended > maxRecommended) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
index e727ec1..fa226b4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
@@ -72,7 +72,7 @@ public class CrunchJobControlTest {
verify(job3).submit();
}
- private class IncrementingPipelineCallable extends PipelineCallable<Void> {
+ private static class IncrementingPipelineCallable extends PipelineCallable<Void> {
private String name;
private boolean executed;
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
index 811a0a3..d1e530a 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
@@ -44,11 +44,9 @@ public class SingleUseIterableTest {
SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
- List<Integer> retrievedValues = Lists.newArrayList(iterable);
-
- for (Integer n : iterable) {
-
- }
+ // Consume twice
+ Lists.newArrayList(iterable);
+ Lists.newArrayList(iterable);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
index c446a69..481086b 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -23,12 +23,11 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertThat;
+import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.crunch.test.Tests;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.junit.Test;
@@ -47,27 +46,33 @@ public class GenericArrayWritableTest {
@Test
public void testNonEmpty() {
GenericArrayWritable src = new GenericArrayWritable();
- src.set(new BytesWritable[] { new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()) });
+ src.set(new BytesWritable[] {
+ new BytesWritable("foo".getBytes(Charset.forName("UTF-8"))),
+ new BytesWritable("bar".getBytes(Charset.forName("UTF-8"))) });
GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
assertThat(src.get(), not(sameInstance(dest.get())));
assertThat(dest.get().length, is(2));
assertThat(Arrays.asList(dest.get()),
- hasItems(new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes())));
+ hasItems(new BytesWritable("foo".getBytes(Charset.forName("UTF-8"))),
+ new BytesWritable("bar".getBytes(Charset.forName("UTF-8")))));
}
@Test
public void testNulls() {
GenericArrayWritable src = new GenericArrayWritable();
- src.set(new BytesWritable[] { new BytesWritable("a".getBytes()), null, new BytesWritable("b".getBytes()) });
+ src.set(new BytesWritable[] {
+ new BytesWritable("a".getBytes(Charset.forName("UTF-8"))), null,
+ new BytesWritable("b".getBytes(Charset.forName("UTF-8"))) });
GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
assertThat(src.get(), not(sameInstance(dest.get())));
assertThat(dest.get().length, is(3));
assertThat(Arrays.asList(dest.get()),
- hasItems(new BytesWritable("a".getBytes()), new BytesWritable("b".getBytes()), null));
+ hasItems(new BytesWritable("a".getBytes(Charset.forName("UTF-8"))),
+ new BytesWritable("b".getBytes(Charset.forName("UTF-8"))), null));
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index 2281473..9af3dea 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -208,6 +208,11 @@ public class WritablesTest {
}
@Override
+ public int hashCode() {
+ return (left == null ? 0 : left.hashCode()) ^ right;
+ }
+
+ @Override
public int compareTo(TestWritable o) {
int cmp = left.compareTo(o.left);
if (cmp != 0)
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
index b2d24f8..5d62d19 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
@@ -168,13 +168,17 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
private static void createTable(Configuration conf, String htableName, String... families) throws MasterNotRunningException, ZooKeeperConnectionException,
IOException {
HBaseAdmin hbase = new HBaseAdmin(conf);
- if (!hbase.tableExists(htableName)) {
- HTableDescriptor desc = new HTableDescriptor(htableName);
- for (String s : families) {
- HColumnDescriptor meta = new HColumnDescriptor(s);
- desc.addFamily(meta);
+ try {
+ if (!hbase.tableExists(htableName)) {
+ HTableDescriptor desc = new HTableDescriptor(htableName);
+ for (String s : families) {
+ HColumnDescriptor meta = new HColumnDescriptor(s);
+ desc.addFamily(meta);
+ }
+ hbase.createTable(desc);
}
- hbase.createTable(desc);
+ } finally {
+ hbase.close();
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 7d8ae83..71cf31f 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -34,7 +34,6 @@ import org.apache.crunch.PipelineResult;
import org.apache.crunch.fn.FilterFns;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
-import org.apache.crunch.lib.Sort;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.writable.Writables;
@@ -72,6 +71,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Serializable;
+import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -112,7 +112,7 @@ public class HFileTargetIT implements Serializable {
// probably created using this process' umask. So we guess the temp dir permissions as
// 0777 & ~umask, and use that to set the config value.
Process process = Runtime.getRuntime().exec("/bin/sh -c umask");
- BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("UTF-8")));
int rc = process.waitFor();
if(rc == 0) {
String umask = br.readLine();
@@ -282,7 +282,9 @@ public class HFileTargetIT implements Serializable {
reader = HFile.createReader(fs, f, new CacheConfig(conf), conf);
assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding());
} finally {
- reader.close();
+ if (reader != null) {
+ reader.close();
+ }
}
hfilesCount++;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index de7b287..dd48352 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Map;
import java.util.Random;
@@ -40,7 +41,6 @@ import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@@ -65,7 +65,8 @@ public class WordCountHBaseIT {
byte[] firstStrBytes = input.second().first().getValue(WORD_COLFAM, null);
byte[] secondStrBytes = input.second().second().getValue(WORD_COLFAM, null);
if (firstStrBytes != null && secondStrBytes != null) {
- return Joiner.on(',').join(new String(firstStrBytes), new String(secondStrBytes));
+ return Joiner.on(',').join(new String(firstStrBytes, Charset.forName("UTF-8")),
+ new String(secondStrBytes, Charset.forName("UTF-8")));
}
return "";
}
@@ -137,7 +138,7 @@ public class WordCountHBaseIT {
public void run(Pipeline pipeline) throws Exception {
Random rand = new Random();
- int postFix = Math.abs(rand.nextInt());
+ int postFix = rand.nextInt() & 0x7FFFFFFF;
String inputTableName = "crunch_words_" + postFix;
String outputTableName = "crunch_counts_" + postFix;
String otherTableName = "crunch_other_" + postFix;
@@ -180,13 +181,16 @@ public class WordCountHBaseIT {
// verify we can do joins.
HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);
-
- key = 0;
- key = put(joinTable, key, "zebra");
- key = put(joinTable, key, "donkey");
- key = put(joinTable, key, "bird");
- key = put(joinTable, key, "horse");
- joinTable.flushCommits();
+ try {
+ key = 0;
+ key = put(joinTable, key, "zebra");
+ key = put(joinTable, key, "donkey");
+ key = put(joinTable, key, "bird");
+ key = put(joinTable, key, "horse");
+ joinTable.flushCommits();
+ } finally {
+ joinTable.close();
+ }
Scan joinScan = new Scan();
joinScan.addFamily(WORD_COLFAM);
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 84de288..4a721f3 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -66,8 +66,9 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu
HTable htable = new HTable(hconf, table);
String[] scanStrings = StringUtils.getStrings(scansAsString);
- Scan[] scans = new Scan[scanStrings.length];
- for(int i = 0; i < scanStrings.length; i++){
+ int length = scanStrings == null ? 0 : scanStrings.length;
+ Scan[] scans = new Scan[length];
+ for(int i = 0; i < length; i++){
scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
index 883d0f0..716b291 100644
--- a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
@@ -87,6 +87,11 @@ public class OrcWritable implements WritableComparable<OrcWritable> {
return compareTo((OrcWritable) obj) == 0;
}
+ @Override
+ public int hashCode() {
+ return blob == null ? 0 : blob.hashCode();
+ }
+
public void setSerde(BinarySortableSerDe serde) {
this.serde = serde;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
index 51b65af..d799842 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
@@ -24,8 +24,8 @@ import org.junit.Test;
import java.util.Map;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class SparkPipelineCallableIT extends CrunchTestSupport {
@Test
http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 4c0cb27..5798e4c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -339,7 +339,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
job.getOutputFormatClass(),
job.getConfiguration());
pt.handleOutputs(job.getConfiguration(), tmpPath, -1);
- } else if (t instanceof MapReduceTarget) {
+ } else { //if (t instanceof MapReduceTarget) {
MapReduceTarget mrt = (MapReduceTarget) t;
mrt.configureForMapReduce(job, ptype, new Path("/tmp"), "out0");
CrunchOutputs.OutputConfig outConfig =
@@ -348,8 +348,6 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
job.setOutputKeyClass(outConfig.keyClass);
job.setOutputValueClass(outConfig.valueClass);
outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration()));
- } else {
- throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t);
}
} catch (Exception et) {
LOG.error("Spark Exception", et);