You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/05/20 18:38:23 UTC

crunch git commit: CRUNCH-520: Coverity scan inspection fixes

Repository: crunch
Updated Branches:
  refs/heads/master 61f98eea9 -> d0bb205ea


CRUNCH-520: Coverity scan inspection fixes

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d0bb205e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d0bb205e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d0bb205e

Branch: refs/heads/master
Commit: d0bb205eaf236bf1d81b9070a2db3428cb90a161
Parents: 61f98ee
Author: Sean Owen <so...@cloudera.com>
Authored: Wed May 20 09:13:24 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Wed May 20 09:31:30 2015 -0700

----------------------------------------------------------------------
 .../contrib/bloomfilter/BloomFiltersIT.java     |  7 +++--
 .../java/org/apache/crunch/ConfigurationIT.java |  2 +-
 .../it/java/org/apache/crunch/MapPObjectIT.java |  6 ++--
 .../java/org/apache/crunch/MaterializeIT.java   |  6 ++--
 .../org/apache/crunch/PTableKeyValueIT.java     |  3 +-
 .../org/apache/crunch/PipelineCallableIT.java   |  4 +--
 .../crunch/SingleUseIterableExceptionIT.java    |  4 +--
 .../apache/crunch/StageResultsCountersIT.java   |  2 +-
 .../crunch/impl/mem/MemPipelineUTF8IT.java      |  6 +---
 .../java/org/apache/crunch/io/ToolRunnerIT.java |  4 ++-
 .../java/org/apache/crunch/lib/AggregateIT.java |  5 +++
 .../crunch/lib/join/MapsideJoinStrategyIT.java  |  5 +--
 .../java/org/apache/crunch/fn/Aggregators.java  |  3 +-
 .../lib/jobcontrol/CrunchJobControl.java        | 32 ++++++++++++--------
 .../apache/crunch/impl/mem/CountersWrapper.java |  3 +-
 .../org/apache/crunch/io/CrunchOutputs.java     |  9 ++----
 .../java/org/apache/crunch/lib/Quantiles.java   |  4 +--
 .../crunch/types/writable/TupleWritable.java    |  6 ++--
 .../writable/WritableGroupedTableType.java      |  2 +-
 .../org/apache/crunch/util/PartitionUtils.java  |  2 ++
 .../lib/jobcontrol/CrunchJobControlTest.java    |  2 +-
 .../crunch/impl/SingleUseIterableTest.java      |  8 ++---
 .../writable/GenericArrayWritableTest.java      | 17 +++++++----
 .../crunch/types/writable/WritablesTest.java    |  5 +++
 .../crunch/examples/WordAggregationHBase.java   | 16 ++++++----
 .../apache/crunch/io/hbase/HFileTargetIT.java   |  8 +++--
 .../crunch/io/hbase/WordCountHBaseIT.java       | 24 +++++++++------
 .../org/apache/crunch/io/hbase/HBaseData.java   |  5 +--
 .../org/apache/crunch/io/orc/OrcWritable.java   |  5 +++
 .../apache/crunch/SparkPipelineCallableIT.java  |  4 +--
 .../apache/crunch/impl/spark/SparkRuntime.java  |  4 +--
 31 files changed, 121 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
index d91e07f..c18c8c4 100644
--- a/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
+++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -46,7 +47,7 @@ public class BloomFiltersIT extends CrunchTestSupport implements Serializable {
         List<String> parts = Arrays.asList(StringUtils.split(input, " "));
         Collection<Key> keys = new HashSet<Key>();
         for (String stringpart : parts) {
-          keys.add(new Key(stringpart.getBytes()));
+          keys.add(new Key(stringpart.getBytes(Charset.forName("UTF-8"))));
         }
         return keys;
       }
@@ -54,8 +55,8 @@ public class BloomFiltersIT extends CrunchTestSupport implements Serializable {
     Map<String, BloomFilter> filterValues = BloomFilterFactory.createFilter(new Path(inputPath), filterFn).getValue();
     assertEquals(1, filterValues.size());
     BloomFilter filter = filterValues.get("shakes.txt");
-    assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes())));
-    assertTrue(filter.membershipTest(new Key("apples".getBytes())));
+    assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes(Charset.forName("UTF-8")))));
+    assertTrue(filter.membershipTest(new Key("apples".getBytes(Charset.forName("UTF-8")))));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
index 0f65d8f..52ba2b6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java
@@ -19,13 +19,13 @@
  */
 package org.apache.crunch;
 
-import junit.framework.Assert;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java b/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
index c48284f..635efdf 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java
@@ -17,7 +17,7 @@
  */
 package org.apache.crunch;
 
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.Map;
@@ -39,8 +39,8 @@ public class MapPObjectIT {
       Pair.of(2, "c"), Pair.of(3, "e"));
 
   public void assertMatches(Map<Integer, String> m) {
-    for (Integer k : m.keySet()) {
-      assertEquals(kvPairs.get(k).second(), m.get(k));
+    for (Map.Entry<Integer, String> e : m.entrySet()) {
+      assertEquals(kvPairs.get(e.getKey()).second(), e.getValue());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
index 7bc61df..455b943 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -17,9 +17,6 @@
  */
 package org.apache.crunch;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -42,6 +39,9 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class MaterializeIT {
 
   @Rule

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
index d56e122..a8a387b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java
@@ -23,8 +23,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 
-import junit.framework.Assert;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.TemporaryPath;
@@ -32,6 +30,7 @@ import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
index b4fc19e..95638a1 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
@@ -26,8 +26,8 @@ import org.junit.Test;
 
 import java.util.Map;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class PipelineCallableIT {
   @Rule

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
index ff2897b..8d070cd 100644
--- a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java
@@ -17,8 +17,6 @@
  */
 package org.apache.crunch;
 
-import java.util.Iterator;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.TemporaryPath;
@@ -35,7 +33,7 @@ public class SingleUseIterableExceptionIT {
   static class ReduceFn extends MapFn<Iterable<String>, String> {
     @Override
     public String map(Iterable<String> input) {
-      Iterator<String> iter = input.iterator();
+      input.iterator();
       throw new CrunchRuntimeException("Exception");
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
index 04711e4..e74c166 100644
--- a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java
@@ -17,9 +17,9 @@
  */
 package org.apache.crunch;
 
-import static junit.framework.Assert.assertTrue;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import java.util.HashSet;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
index 56b167a..3e74cdd 100644
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java
@@ -18,15 +18,10 @@
 package org.apache.crunch.impl.mem;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import junit.framework.Assert;
 
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
@@ -36,6 +31,7 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.text.TextFileTarget;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
index 287ba93..57bb5fa 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch.io;
 
+import java.nio.charset.Charset;
+
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
@@ -205,7 +207,7 @@ public class ToolRunnerIT {
 
     @Override
     public Pair<BytesWritable, BytesWritable> map(String input) {
-      BytesWritable bw = new BytesWritable(input.getBytes());
+      BytesWritable bw = new BytesWritable(input.getBytes(Charset.forName("UTF-8")));
       return Pair.of(bw, bw);
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 1408c73..5675de8 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -248,5 +248,10 @@ public class AggregateIT {
       return true;
     }
 
+    @Override
+    public int hashCode() {
+      return value == null ? 0 : value.hashCode();
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index f9caa3a..1917038 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -159,7 +160,7 @@ public class MapsideJoinStrategyIT {
   public void testLegacyMapsideJoin_LeftSideIsEmpty() throws IOException {
     MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration());
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
-    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+    readTable(pipeline, "orders.txt");
 
     PTable<Integer, String> filteredCustomerTable = customerTable
         .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), customerTable.getPTableType());
@@ -235,7 +236,7 @@ public class MapsideJoinStrategyIT {
     OutputStream out2 = fs.create(path2, true);
 
     for(int i = 0; i < 4; i++){
-      byte[] value = ("value" + i + "\n").getBytes();
+      byte[] value = ("value" + i + "\n").getBytes(Charset.forName("UTF-8"));
       out1.write(value);
       out2.write(value);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index 5a9c157..62ee089 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -1056,7 +1056,8 @@ public final class Aggregators {
     @Override
     public void update(final String next) {
       long length = (next == null) ? 0 : next.length() + separator.length();
-      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
+      if ((maxOutputLength > 0 && currentLength + length > maxOutputLength) ||
+          (maxInputLength > 0 && next != null && next.length() > maxInputLength)) {
         return;
       }
       if (maxOutputLength > 0) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index aac6296..62147ad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
@@ -156,17 +155,26 @@ public class CrunchJobControl {
   }
 
   private Map<Integer, CrunchControlledJob> getQueue(State state) {
-    Map<Integer, CrunchControlledJob> retv = null;
-    if (state == State.WAITING) {
-      retv = this.waitingJobs;
-    } else if (state == State.READY) {
-      retv = this.readyJobs;
-    } else if (state == State.RUNNING) {
-      retv = this.runningJobs;
-    } else if (state == State.SUCCESS) {
-      retv = this.successfulJobs;
-    } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
-      retv = this.failedJobs;
+    Map<Integer, CrunchControlledJob> retv;
+    switch (state) {
+      case WAITING:
+        retv = this.waitingJobs;
+        break;
+      case READY:
+        retv = this.readyJobs;
+        break;
+      case RUNNING:
+        retv = this.runningJobs;
+        break;
+      case SUCCESS:
+        retv = this.successfulJobs;
+        break;
+      case FAILED:
+      case DEPENDENT_FAILED:
+        retv = this.failedJobs;
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown state " + state);
     }
     return retv;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
index ee0906b..7312402 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 
-import javax.annotation.Nullable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -87,7 +86,7 @@ class CountersWrapper extends Counters {
   public Iterator<CounterGroup> iterator() {
     return Iterators.concat(Iterables.transform(allCounters, new Function<Counters, Iterator<CounterGroup>>() {
       @Override
-      public Iterator<CounterGroup> apply(@Nullable Counters input) {
+      public Iterator<CounterGroup> apply(Counters input) {
         return input.iterator();
       }
     }).iterator());

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 0d06931..247ac08 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -195,13 +195,8 @@ public class CrunchOutputs<K, V> {
     job = getJob(job.getJobID(), namedOutput,baseConf);
 
     OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
-    TaskAttemptContext taskContext = null;
-    RecordWriter<K, V> recordWriter = null;
-
-    if (baseContext != null) {
-      taskContext = getTaskContext(baseContext, job);
-      recordWriter = fmt.getRecordWriter(taskContext);
-    }
+    TaskAttemptContext taskContext = getTaskContext(baseContext, job);
+    RecordWriter<K, V> recordWriter = fmt.getRecordWriter(taskContext);
     OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
     this.outputStates.put(namedOutput, outputState);
     return outputState;

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
index d6fc454..4262c58 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
@@ -30,8 +30,6 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 
-import javax.annotation.Nullable;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -173,7 +171,7 @@ public class Quantiles {
 
       Iterator<V> valueIterator = Iterators.transform(iterator, new Function<Pair<V, Long>, V>() {
         @Override
-        public V apply(@Nullable Pair<V, Long> input) {
+        public V apply(Pair<V, Long> input) {
           return input.first();
         }
       });

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index bdd3ad9..068b0af 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -334,8 +334,10 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl
         int cmp = WritableComparator.get(clazz.asSubclass(WritableComparable.class)).compare(
             buffer1.getData(), buffer1.getPosition(), bodySize1,
             buffer2.getData(), buffer2.getPosition(), bodySize2);
-        buffer1.skip(bodySize1);
-        buffer2.skip(bodySize2);
+        long skipped1 = buffer1.skip(bodySize1);
+        long skipped2 = buffer2.skip(bodySize2);
+        Preconditions.checkState(skipped1 == bodySize1);
+        Preconditions.checkState(skipped2 == bodySize2);
         return cmp;
       } else {
         // fallback to deserialization

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
index c25345b..c251905 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -94,7 +94,7 @@ class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
     WritableType valueType = (WritableType) tableType.getValueType();
     job.setMapOutputKeyClass(keyType.getSerializationClass());
     job.setMapOutputValueClass(valueType.getSerializationClass());
-    if (options.getSortComparatorClass() == null &&
+    if ((options == null || options.getSortComparatorClass() == null) &&
         TupleWritable.class.equals(keyType.getSerializationClass())) {
       job.setSortComparatorClass(TupleWritable.Comparator.class);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
index cdcc401..fbd4ebd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.util;
 
+import com.google.common.base.Preconditions;
 import org.apache.crunch.PCollection;
 import org.apache.hadoop.conf.Configuration;
 
@@ -42,6 +43,7 @@ public class PartitionUtils {
 
   public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) {
     long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK);
+    Preconditions.checkArgument(bytesPerTask > 0);
     int recommended = 1 + (int) (pcollection.getSize() / bytesPerTask);
     int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS);
     if (maxRecommended > 0 && recommended > maxRecommended) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
index e727ec1..fa226b4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
@@ -72,7 +72,7 @@ public class CrunchJobControlTest {
     verify(job3).submit();
   }
 
-  private class IncrementingPipelineCallable extends PipelineCallable<Void> {
+  private static class IncrementingPipelineCallable extends PipelineCallable<Void> {
 
     private String name;
     private boolean executed;

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
index 811a0a3..d1e530a 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java
@@ -44,11 +44,9 @@ public class SingleUseIterableTest {
     
     SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values);
 
-    List<Integer> retrievedValues = Lists.newArrayList(iterable);
-
-    for (Integer n : iterable) {
-      
-    }
+    // Consume twice
+    Lists.newArrayList(iterable);
+    Lists.newArrayList(iterable);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
index c446a69..481086b 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -23,12 +23,11 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertThat;
 
+import java.nio.charset.Charset;
 import java.util.Arrays;
 
 import org.apache.crunch.test.Tests;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.junit.Test;
 
 
@@ -47,27 +46,33 @@ public class GenericArrayWritableTest {
   @Test
   public void testNonEmpty() {
     GenericArrayWritable src = new GenericArrayWritable();
-    src.set(new BytesWritable[] { new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()) });
+    src.set(new BytesWritable[] {
+        new BytesWritable("foo".getBytes(Charset.forName("UTF-8"))),
+        new BytesWritable("bar".getBytes(Charset.forName("UTF-8"))) });
 
     GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
 
     assertThat(src.get(), not(sameInstance(dest.get())));
     assertThat(dest.get().length, is(2));
     assertThat(Arrays.asList(dest.get()),
-        hasItems(new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes())));
+        hasItems(new BytesWritable("foo".getBytes(Charset.forName("UTF-8"))),
+                 new BytesWritable("bar".getBytes(Charset.forName("UTF-8")))));
   }
 
   @Test
   public void testNulls() {
     GenericArrayWritable src = new GenericArrayWritable();
-    src.set(new BytesWritable[] { new BytesWritable("a".getBytes()), null, new BytesWritable("b".getBytes()) });
+    src.set(new BytesWritable[] {
+        new BytesWritable("a".getBytes(Charset.forName("UTF-8"))), null,
+        new BytesWritable("b".getBytes(Charset.forName("UTF-8"))) });
 
     GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
 
     assertThat(src.get(), not(sameInstance(dest.get())));
     assertThat(dest.get().length, is(3));
     assertThat(Arrays.asList(dest.get()),
-        hasItems(new BytesWritable("a".getBytes()), new BytesWritable("b".getBytes()), null));
+        hasItems(new BytesWritable("a".getBytes(Charset.forName("UTF-8"))),
+                 new BytesWritable("b".getBytes(Charset.forName("UTF-8"))), null));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index 2281473..9af3dea 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -208,6 +208,11 @@ public class WritablesTest {
     }
 
     @Override
+    public int hashCode() {
+      return (left == null ? 0 : left.hashCode()) ^ right;
+    }
+
+    @Override
     public int compareTo(TestWritable o) {
       int cmp = left.compareTo(o.left);
       if (cmp != 0)

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
index b2d24f8..5d62d19 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
@@ -168,13 +168,17 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
   private static void createTable(Configuration conf, String htableName, String... families) throws MasterNotRunningException, ZooKeeperConnectionException,
       IOException {
     HBaseAdmin hbase = new HBaseAdmin(conf);
-    if (!hbase.tableExists(htableName)) {
-      HTableDescriptor desc = new HTableDescriptor(htableName);
-      for (String s : families) {
-        HColumnDescriptor meta = new HColumnDescriptor(s);
-        desc.addFamily(meta);
+    try {
+      if (!hbase.tableExists(htableName)) {
+        HTableDescriptor desc = new HTableDescriptor(htableName);
+        for (String s : families) {
+          HColumnDescriptor meta = new HColumnDescriptor(s);
+          desc.addFamily(meta);
+        }
+        hbase.createTable(desc);
       }
-      hbase.createTable(desc);
+    } finally {
+      hbase.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 7d8ae83..71cf31f 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -34,7 +34,6 @@ import org.apache.crunch.PipelineResult;
 import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
-import org.apache.crunch.lib.Sort;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.writable.Writables;
@@ -72,6 +71,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -112,7 +112,7 @@ public class HFileTargetIT implements Serializable {
     // probably created using this process' umask. So we guess the temp dir permissions as
     // 0777 & ~umask, and use that to set the config value.
     Process process = Runtime.getRuntime().exec("/bin/sh -c umask");
-    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("UTF-8")));
     int rc = process.waitFor();
     if(rc == 0) {
       String umask = br.readLine();
@@ -282,7 +282,9 @@ public class HFileTargetIT implements Serializable {
         reader = HFile.createReader(fs, f, new CacheConfig(conf), conf);
         assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding());
       } finally {
-        reader.close();
+        if (reader != null) {
+          reader.close();
+        }
       }
       hfilesCount++;
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index de7b287..dd48352 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.Random;
 
@@ -40,7 +41,6 @@ import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -65,7 +65,8 @@ public class WordCountHBaseIT {
       byte[] firstStrBytes = input.second().first().getValue(WORD_COLFAM, null);
       byte[] secondStrBytes = input.second().second().getValue(WORD_COLFAM, null);
       if (firstStrBytes != null && secondStrBytes != null) {
-        return Joiner.on(',').join(new String(firstStrBytes), new String(secondStrBytes));
+        return Joiner.on(',').join(new String(firstStrBytes, Charset.forName("UTF-8")),
+                                   new String(secondStrBytes, Charset.forName("UTF-8")));
       }
       return "";
     }
@@ -137,7 +138,7 @@ public class WordCountHBaseIT {
   public void run(Pipeline pipeline) throws Exception {
 
     Random rand = new Random();
-    int postFix = Math.abs(rand.nextInt());
+    int postFix = rand.nextInt() & 0x7FFFFFFF;
     String inputTableName = "crunch_words_" + postFix;
     String outputTableName = "crunch_counts_" + postFix;
     String otherTableName = "crunch_other_" + postFix;
@@ -180,13 +181,16 @@ public class WordCountHBaseIT {
 
     // verify we can do joins.
     HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);
-
-    key = 0;
-    key = put(joinTable, key, "zebra");
-    key = put(joinTable, key, "donkey");
-    key = put(joinTable, key, "bird");
-    key = put(joinTable, key, "horse");
-    joinTable.flushCommits();
+    try {
+      key = 0;
+      key = put(joinTable, key, "zebra");
+      key = put(joinTable, key, "donkey");
+      key = put(joinTable, key, "bird");
+      key = put(joinTable, key, "horse");
+      joinTable.flushCommits();
+    } finally {
+      joinTable.close();
+    }
 
     Scan joinScan = new Scan();
     joinScan.addFamily(WORD_COLFAM);

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 84de288..4a721f3 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -66,8 +66,9 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu
     HTable htable = new HTable(hconf, table);
 
     String[] scanStrings = StringUtils.getStrings(scansAsString);
-    Scan[] scans = new Scan[scanStrings.length];
-    for(int i = 0; i < scanStrings.length; i++){
+    int length = scanStrings == null ? 0 : scanStrings.length;
+    Scan[] scans = new Scan[length];
+    for(int i = 0; i < length; i++){
       scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]);
     }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
index 883d0f0..716b291 100644
--- a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
@@ -87,6 +87,11 @@ public class OrcWritable implements WritableComparable<OrcWritable> {
     return compareTo((OrcWritable) obj) == 0;
   }
 
+  @Override
+  public int hashCode() {
+    return blob == null ? 0 : blob.hashCode();
+  }
+
   public void setSerde(BinarySortableSerDe serde) {
     this.serde = serde;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
index 51b65af..d799842 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
@@ -24,8 +24,8 @@ import org.junit.Test;
 
 import java.util.Map;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class SparkPipelineCallableIT extends CrunchTestSupport {
   @Test

http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 4c0cb27..5798e4c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -339,7 +339,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
                     job.getOutputFormatClass(),
                     job.getConfiguration());
                 pt.handleOutputs(job.getConfiguration(), tmpPath, -1);
-              } else if (t instanceof MapReduceTarget) {
+              } else { //if (t instanceof MapReduceTarget) {
                 MapReduceTarget mrt = (MapReduceTarget) t;
                 mrt.configureForMapReduce(job, ptype, new Path("/tmp"), "out0");
                 CrunchOutputs.OutputConfig outConfig =
@@ -348,8 +348,6 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
                 job.setOutputKeyClass(outConfig.keyClass);
                 job.setOutputValueClass(outConfig.valueClass);
                 outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration()));
-              } else {
-                throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t);
               }
             } catch (Exception et) {
               LOG.error("Spark Exception", et);