You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/04/05 17:51:50 UTC

[3/4] kudu git commit: Spark ITBLL

Spark ITBLL

This adds a new end-to-end test for Spark based on the existing
MapReduce ITBLL test. Common parts of the two have been abstracted into
a utility class.

The implementation is only compatible with Spark2/Scala2.11, so the new
submodule is deactivated when compiling with the Spark1/Scala2.10
profile.

The new job's command line arguments are different (and hopefully
simpler) than the MR ITBLL, but the generated tables are designed to be
interoperable, meaning the MR verify job should work on tables generated
with the Spark generator, and vice versa.

Change-Id: I968fb236f1e93e548db9fd79443912c664e06a1f
Reviewed-on: http://gerrit.cloudera.org:8080/6419
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/94899aad
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/94899aad
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/94899aad

Branch: refs/heads/master
Commit: 94899aad07e067d6895915e2bb6b5770edd69d7e
Parents: 71deb95
Author: Dan Burkert <da...@apache.org>
Authored: Fri Mar 17 13:52:51 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Wed Apr 5 17:51:25 2017 +0000

----------------------------------------------------------------------
 build-support/jenkins/build-and-test.sh         |   6 +-
 .../mapreduce/tools/BigLinkedListCommon.java    | 190 ++++++++
 .../tools/IntegrationTestBigLinkedList.java     | 151 +------
 java/kudu-spark-tools/pom.xml                   | 190 ++++++++
 .../tools/IntegrationTestBigLinkedList.scala    | 453 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  23 +
 .../IntegrationTestBigLinkedListTest.scala      |  84 ++++
 java/kudu-spark/pom.xml                         |  29 +-
 .../apache/kudu/spark/kudu/DefaultSource.scala  |   3 +-
 java/pom.xml                                    | 106 +++--
 10 files changed, 1025 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/build-support/jenkins/build-and-test.sh
----------------------------------------------------------------------
diff --git a/build-support/jenkins/build-and-test.sh b/build-support/jenkins/build-and-test.sh
index 98b0d1c..0c58969 100755
--- a/build-support/jenkins/build-and-test.sh
+++ b/build-support/jenkins/build-and-test.sh
@@ -360,12 +360,12 @@ if [ "$BUILD_JAVA" == "1" ]; then
     EXIT_STATUS=1
     FAILURES="$FAILURES"$'Java build/test failed\n'
   fi
-  # Test kudu-spark with Spark 2.x + Scala 2.11 profile
+  # Test kudu-spark and kudu-spark-tools with Spark 2.x + Scala 2.11 profile
   # This won't work if there are ever Spark integration tests!
   rm -rf kudu-spark/target/
-  if ! mvn $MVN_FLAGS -Pspark2_2.11 -Dtest="org.apache.kudu.spark.kudu.*" test; then
+  if ! mvn $MVN_FLAGS -Pspark2_2.11 -Dtest="org.apache.kudu.spark.*.*" test; then
     EXIT_STATUS=1
-    FAILURES="$FAILURES"$'Java build/test failed\n'
+    FAILURES="$FAILURES"$'spark2 build/test failed\n'
   fi
   set +x
   popd

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/BigLinkedListCommon.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/BigLinkedListCommon.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/BigLinkedListCommon.java
new file mode 100644
index 0000000..9e13984
--- /dev/null
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/BigLinkedListCommon.java
@@ -0,0 +1,190 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.math.BigInteger;
+import java.util.Collections;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * Static constants, helper methods, and utility classes for BigLinkedList
+ * implementations.
+ *
+ * Any definitions which must be kept in-sync between ITBLL implementations
+ * should be kept here.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class BigLinkedListCommon {
+
+  public static final String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
+  public static final String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
+  public static final String HEADS_TABLE_NAME_KEY = "IntegrationTestBigLinkedList.heads_table";
+  public static final String DEFAULT_HEADS_TABLE_NAME = "IntegrationTestBigLinkedListHeads";
+
+  /** Row key, two times 8 bytes. */
+  public static final String COLUMN_KEY_ONE = "key1";
+  public static final int COLUMN_KEY_ONE_IDX = 0;
+  public static final String COLUMN_KEY_TWO = "key2";
+  public static final int COLUMN_KEY_TWO_IDX = 1;
+
+  /** Link to the id of the prev node in the linked list, two times 8 bytes. */
+  public static final String COLUMN_PREV_ONE = "prev1";
+  public static final int COLUMN_PREV_ONE_IDX = 2;
+  public static final String COLUMN_PREV_TWO = "prev2";
+  public static final int COLUMN_PREV_TWO_IDX = 3;
+
+  /** the id of the row within the same client. */
+  public static final String COLUMN_ROW_ID = "row_id";
+  public static final int COLUMN_ROW_ID_IDX = 4;
+
+  /** identifier of the mapred task that generated this row. */
+  public static final String COLUMN_CLIENT = "client";
+  public static final int COLUMN_CLIENT_IDX = 5;
+
+  /** The number of times this row was updated. */
+  public static final String COLUMN_UPDATE_COUNT = "update_count";
+  public static final int COLUMN_UPDATE_COUNT_IDX = 6;
+
+  public enum Counts {
+
+    /** Nodes which are not contained in the previous pointer of any other nodes. */
+    UNREFERENCED,
+
+    /** Nodes which are referenced from another node, but do not appear in the table. */
+    UNDEFINED,
+
+    /** Nodes which have a single reference from another node. */
+    REFERENCED,
+
+    /** Nodes which have multiple references from other nodes. */
+    EXTRAREFERENCES,
+  }
+
+  public static Schema getTableSchema() {
+    return new Schema(ImmutableList.of(
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_ONE, Type.INT64).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_TWO, Type.INT64).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_PREV_ONE, Type.INT64).nullable(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_PREV_TWO, Type.INT64).nullable(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_ROW_ID, Type.INT64).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_CLIENT, Type.STRING).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_UPDATE_COUNT, Type.INT32).build()
+    ));
+  }
+
+  public static Schema getHeadsTableSchema() {
+    return new Schema(ImmutableList.of(
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_ONE, Type.INT64).key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_TWO, Type.INT64).key(true).build()
+    ));
+  }
+
+  public static CreateTableOptions getCreateTableOptions(Schema schema,
+                                                         int numReplicas,
+                                                         int rangePartitions,
+                                                         int hashPartitions) {
+    Preconditions.checkArgument(rangePartitions > 0);
+    Preconditions.checkArgument(hashPartitions > 0);
+
+    CreateTableOptions options = new CreateTableOptions().setNumReplicas(numReplicas);
+
+    if (rangePartitions > 1) {
+      options.setRangePartitionColumns(ImmutableList.of(COLUMN_KEY_ONE));
+      BigInteger min = BigInteger.valueOf(Long.MIN_VALUE);
+      BigInteger max = BigInteger.valueOf(Long.MAX_VALUE);
+      BigInteger step = max.multiply(BigInteger.valueOf(2))
+                           .divide(BigInteger.valueOf(rangePartitions));
+
+      PartialRow splitRow = schema.newPartialRow();
+      for (int i = 1; i < rangePartitions; i++) {
+        long key = min.add(step.multiply(BigInteger.valueOf(i))).longValue();
+        splitRow.addLong(COLUMN_KEY_ONE_IDX, key);
+        options.addSplitRow(splitRow);
+      }
+    } else {
+      options.setRangePartitionColumns(Collections.<String>emptyList());
+    }
+
+    if (hashPartitions > 1) {
+      options.addHashPartitions(ImmutableList.of(COLUMN_KEY_ONE), hashPartitions);
+    }
+
+    return options;
+  }
+
+  /**
+   * Implementation of the Xoroshiro128+ PRNG.
+   * Copied under the public domain from SquidLib.
+   */
+  public static class Xoroshiro128PlusRandom {
+    private long state0;
+    private long state1;
+
+    public Xoroshiro128PlusRandom() {
+      this((long) (Math.random() * Long.MAX_VALUE));
+    }
+
+    public Xoroshiro128PlusRandom(long seed) {
+      long state = seed + 0x9E3779B97F4A7C15L;
+      long z = state;
+      z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
+      z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
+      state0 = z ^ (z >>> 31);
+      state += state0 + 0x9E3779B97F4A7C15L;
+      z = state;
+      z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
+      z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
+      state1 = z ^ (z >>> 31);
+    }
+
+    public long nextLong() {
+      final long s0 = state0;
+      long s1 = state1;
+      final long result = s0 + s1;
+
+      s1 ^= s0;
+      state0 = Long.rotateLeft(s0, 55) ^ s1 ^ (s1 << 14); // a, b
+      state1 = Long.rotateLeft(s1, 36); // c
+
+      return result;
+    }
+
+    public void nextBytes(final byte[] bytes) {
+      int i = bytes.length;
+      int n = 0;
+      while (i != 0) {
+        n = Math.min(i, 8);
+        for (long bits = nextLong(); n-- != 0; bits >>>= 8) {
+          bytes[--i] = (byte) bits;
+        }
+      }
+    }
+  }
+
+  /** Uninstantiable helper class. */
+  private BigLinkedListCommon() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
index 6a26340..beddb71 100644
--- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
@@ -18,7 +18,6 @@ package org.apache.kudu.mapreduce.tools;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -26,7 +25,6 @@ import java.util.List;
 import java.util.UUID;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -63,9 +61,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 import org.apache.kudu.client.AbstractKuduScannerBuilder;
@@ -86,6 +82,8 @@ import org.apache.kudu.mapreduce.CommandLineParser;
 import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
 import org.apache.kudu.util.Pair;
 
+import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.*;
+
 /**
  * <p>
  * This is an integration test borrowed from goraci, written by Keith Turner,
@@ -229,31 +227,6 @@ import org.apache.kudu.util.Pair;
 public class IntegrationTestBigLinkedList extends Configured implements Tool {
   private static final byte[] NO_KEY = new byte[1];
 
-  protected static final String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
-
-  protected static final String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
-
-  protected static final String HEADS_TABLE_NAME_KEY = "IntegrationTestBigLinkedList.heads_table";
-
-  protected static final String DEFAULT_HEADS_TABLE_NAME = "IntegrationTestBigLinkedListHeads";
-
-  /** Row key, two times 8 bytes. */
-  private static final String COLUMN_KEY_ONE = "key1";
-  private static final String COLUMN_KEY_TWO = "key2";
-
-  /** Link to the id of the prev node in the linked list, two times 8 bytes. */
-  private static final String COLUMN_PREV_ONE = "prev1";
-  private static final String COLUMN_PREV_TWO = "prev2";
-
-  /** identifier of the mapred task that generated this row. */
-  private static final String COLUMN_CLIENT = "client";
-
-  /** the id of the row within the same client. */
-  private static final String COLUMN_ROW_ID = "row_id";
-
-  /** The number of times this row was updated. */
-  private static final String COLUMN_UPDATE_COUNT = "update_count";
-
   /** How many rows to write per map task. This has to be a multiple of 25M. */
   private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
       = "IntegrationTestBigLinkedList.generator.num_rows";
@@ -282,89 +255,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
     int updateCount;
   }
 
-  static Schema getTableSchema() {
-    List<ColumnSchema> columns = new ArrayList<ColumnSchema>(7);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_ONE, Type.INT64)
-        .key(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_TWO, Type.INT64)
-        .key(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_PREV_ONE, Type.INT64)
-        .nullable(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_PREV_TWO, Type.INT64)
-        .nullable(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_ROW_ID, Type.INT64)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_CLIENT, Type.STRING)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_UPDATE_COUNT, Type.INT32)
-        .build());
-    return new Schema(columns);
-  }
-
-  static Schema getHeadsTableSchema() {
-    List<ColumnSchema> columns = new ArrayList<ColumnSchema>(2);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_ONE, Type.INT64)
-        .key(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder(COLUMN_KEY_TWO, Type.INT64)
-        .key(true)
-        .build());
-    return new Schema(columns);
-  }
-
-  /**
-   * Implementation of the Xoroshiro128+ PRNG.
-   * Copied under the public domain from SquidLib.
-   */
-  private static class Xoroshiro128PlusRandom {
-    private long state0;
-    private long state1;
-
-    public Xoroshiro128PlusRandom() {
-      this((long) (Math.random() * Long.MAX_VALUE));
-    }
-
-    public Xoroshiro128PlusRandom(long seed) {
-      long state = seed + 0x9E3779B97F4A7C15L;
-      long z = state;
-      z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
-      z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
-      state0 = z ^ (z >>> 31);
-      state += state0 + 0x9E3779B97F4A7C15L;
-      z = state;
-      z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
-      z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
-      state1 = z ^ (z >>> 31);
-    }
-
-    public long nextLong() {
-      final long s0 = state0;
-      long s1 = state1;
-      final long result = s0 + s1;
-
-      s1 ^= s0;
-      state0 = Long.rotateLeft(s0, 55) ^ s1 ^ (s1 << 14); // a, b
-      state1 = Long.rotateLeft(s1, 36); // c
-
-      return result;
-    }
-
-    public void nextBytes(final byte[] bytes) {
-      int i = bytes.length;
-      int n = 0;
-      while (i != 0) {
-        n = Math.min(i, 8);
-        for (long bits = nextLong(); n-- != 0; bits >>>= 8) {
-          bytes[--i] = (byte) bits;
-        }
-      }
-    }
-  }
-
   /**
    * A Map only job that generates random linked list and stores them.
    */
@@ -579,9 +469,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
 
       private static <T> void circularLeftShift(T[] first) {
         T ez = first[0];
-        for (int i = 0; i < first.length - 1; i++) {
-          first[i] = first[i + 1];
-        }
+        System.arraycopy(first, 1, first, 0, first.length - 1);
         first[first.length - 1] = ez;
       }
 
@@ -666,34 +554,13 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
     }
 
     private void createSchema(String tableName, Schema schema, int numTablets) throws Exception {
-      Preconditions.checkNotNull(client);
-      if (numTablets < 1) {
-        numTablets = 1;
-      }
-
       if (client.tableExists(tableName)) {
         return;
       }
 
-      CreateTableOptions builder =
-          new CreateTableOptions().setNumReplicas(parser.getNumReplicas())
-                                  .setRangePartitionColumns(ImmutableList.of("key1", "key2"));
-      if (numTablets > 1) {
-        BigInteger min = BigInteger.valueOf(Long.MIN_VALUE);
-        BigInteger max = BigInteger.valueOf(Long.MAX_VALUE);
-        BigInteger step = max.multiply(BigInteger.valueOf(2))
-            .divide(BigInteger.valueOf(numTablets));
-        LOG.info("min: {}, max: {}, step: {}", min, max, step);
-        PartialRow splitRow = schema.newPartialRow();
-        splitRow.addLong("key2", Long.MIN_VALUE);
-        for (int i = 1; i < numTablets; i++) {
-          long key = min.add(step.multiply(BigInteger.valueOf(i))).longValue();
-          LOG.info("key " + key);
-          splitRow.addLong("key1", key);
-          builder.addSplitRow(splitRow);
-        }
-      }
-
+      CreateTableOptions builder = getCreateTableOptions(schema,
+                                                         parser.getNumReplicas(),
+                                                         numTablets, 1);
       client.createTable(tableName, schema, builder);
     }
 
@@ -784,7 +651,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
 
       @Override
       protected void map(NullWritable key, RowResult value, Mapper.Context context)
-          throws IOException ,InterruptedException {
+          throws IOException, InterruptedException {
         Bytes.setLong(rowKey, value.getLong(0));
         Bytes.setLong(rowKey, value.getLong(1), 8);
 
@@ -803,10 +670,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       }
     }
 
-    public enum Counts {
-      UNREFERENCED, UNDEFINED, REFERENCED, EXTRAREFERENCES
-    }
-
     public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
       private ArrayList<byte[]> refs = new ArrayList<byte[]>();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-spark-tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/pom.xml b/java/kudu-spark-tools/pom.xml
new file mode 100644
index 0000000..2c6ab9d
--- /dev/null
+++ b/java/kudu-spark-tools/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.kudu</groupId>
+        <artifactId>kudu-parent</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kudu-spark-tools</artifactId>
+    <name>Kudu Spark Tools</name>
+    <description>Collection of tools using Spark and Kudu</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client-tools</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-spark2_2.11</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-spark2_2.11</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <version>${spark2.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <!-- make sure wrong scala version is not pulled in -->
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <!-- make sure wrong scala version is not pulled in -->
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scalap</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+            <version>${spark2.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala-2.11.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_2.11</artifactId>
+            <version>2.2.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalamock</groupId>
+            <artifactId>scalamock-scalatest-support_2.11</artifactId>
+            <version>3.1.4</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.0</version>
+                <configuration>
+                    <charset>${project.build.sourceEncoding}</charset>
+                    <scalaVersion>${scala-2.11.version}</scalaVersion>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+                <configuration>
+                    <artifactSet>
+                        <includes>
+                            <include>*:*</include>
+                        </includes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
new file mode 100644
index 0000000..52647f8
--- /dev/null
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.spark.tools
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.kudu.client.SessionConfiguration.FlushMode
+import org.apache.kudu.client.{KuduClient, KuduSession, KuduTable}
+import org.apache.kudu.mapreduce.tools.BigLinkedListCommon.{Xoroshiro128PlusRandom, _}
+import org.apache.kudu.spark.kudu.KuduContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext, TaskContext}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.util.Try
+
+/**
+  * Spark port of [[org.apache.kudu.mapreduce.tools.IntegrationTestBigLinkedList]].
+  *
+  * Major differences:
+  *   * Currently, only the generator and verifier jobs are implemented.
+  *   * The heads table is not written to during generate, and not used during verify.
+  *   * The generate job does not write in batches. Instead, it writes a head node,
+  *     followed by many tail nodes into the table, and then updates just the
+  *     head node to point at the final tail node. Writes use AUTO_FLUSH_BACKGROUND.
+  *     This is hopefully easier to understand, and has the advantage of stressing
+  *     slightly different code paths than the MR version.
+  */
+object IntegrationTestBigLinkedList {
+  val LOG: Logger = LoggerFactory.getLogger(IntegrationTestBigLinkedList.getClass)
+
+  def usage: String =
+    s"""
+       | Usage: COMMAND [COMMAND options]");
+       |    where COMMAND is one of:
+       |
+       |        generate    A Spark job that generates linked list data.
+       |
+       |        verify      A Spark job that verifies generated linked list data.
+       |                    Fails the job if any UNDEFINED, UNREFERENCED, or
+       |                    EXTRAREFERENCES nodes are found. Do not run at the
+       |                     same time as the Generate command.
+       |
+       |        loop        Loops the generate and verify jobs indefinitely.
+       |                    Data is not cleaned between runs, so each iteration
+       |                    adds more data.
+    """.stripMargin
+
+  def parseIntFlag(flag: String, num: String): Int = {
+    Try(num.toInt).getOrElse(fail(s"failed to parse $flag value as integer: $num"))
+  }
+
+  def parseLongFlag(flag: String, num: String): Long = {
+    Try(num.toLong).getOrElse(fail(s"failed to parse $flag value as integer: $num"))
+  }
+
+  def fail(msg: String): Nothing = {
+    System.err.println(msg)
+    sys.exit(1)
+  }
+
+  def nanosToHuman(n: Long): String = {
+    if (n > 10 * 60 * 1e9) "%s.3m".format(n / (60 * 1e9))
+    else if (n > 1e9) "%s.3s".format(n / 1e9)
+    else if (n > 1e6) "%s.3ms".format(n / 1e6)
+    else if (n > 1e3) "%s.3\u03bcs".format(n / 1e3)
+    else s"${n}ns"
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (args.isEmpty) { fail(usage) }
+
+    args(0).toLowerCase() match {
+      case "generate" => Generator.main(args.slice(1, args.length))
+      case "verify" => Verifier.main(args.slice(1, args.length))
+      case "loop" => Looper.main(args.slice(1, args.length))
+      case _ => fail(usage)
+    }
+  }
+}
+
+object Generator {
+  import IntegrationTestBigLinkedList.{LOG, fail, nanosToHuman, parseIntFlag}
+
+  def usage: String =
+    s"""
+       | Usage: generate --tasks=<tasks> --lists=<lists> --nodes=<nodes>
+       |                 --hash-partitions=<hash-partitions> --range-partitions=<range-partitions>
+       |                 --replicas=<replicas> --master-addrs=<master-addrs> --table-name=<table-name>
+       |    where
+       |      tasks: number of Spark tasks to create, default: 1
+       |      lists: number of linked lists to create per task, default: 1
+       |      nodes: number of nodes to create per list, default: 10000000
+       |      hashPartitions: number of hash partitions to create for the new linked list table, if it doesn't exist, default: 1
+       |      rangePartitions: number of range partitions to create for the new linked list table, if it doesn't exist, default: 1
+       |      replicas: number of replicas to create for the new linked list table, if it doesn't exist, default: 1
+       |      master-addrs: comma separated addresses of Kudu master nodes, default: localhost
+       |      table-name: the name of the linked list table, default: $DEFAULT_TABLE_NAME
+     """.stripMargin
+
+  case class Args(tasks: Int = 1,
+                  lists: Int = 1,
+                  nodes: Int = 10000000,
+                  hashPartitions: Int = 1,
+                  rangePartitions: Int = 1,
+                  replicas: Int = 1,
+                  masterAddrs: String = "localhost",
+                  tableName: String = DEFAULT_TABLE_NAME)
+
+  object Args {
+    private def parseInner(options: Args, args: List[String]): Args = {
+      args match {
+        case Nil => options
+        case "--help" :: _ =>
+          System.err.println(usage)
+          sys.exit(0)
+        case flag :: Nil => fail(s"flag $flag has no value\n$usage")
+        case flag :: value :: tail =>
+          val newOptions: Args = flag match {
+            case "--tasks" => options.copy(tasks = parseIntFlag(flag, value))
+            case "--lists" => options.copy(lists = parseIntFlag(flag, value))
+            case "--nodes" => options.copy(nodes = parseIntFlag(flag, value))
+            case "--hash-partitions" => options.copy(hashPartitions = parseIntFlag(flag, value))
+            case "--range-partitions" => options.copy(rangePartitions = parseIntFlag(flag, value))
+            case "--replicas" => options.copy(replicas = parseIntFlag(flag, value))
+            case "--master-addrs" => options.copy(masterAddrs = value)
+            case "--table-name" => options.copy(tableName = value)
+            case _ => fail(s"unknown generate flag $flag")
+          }
+          parseInner(newOptions, tail)
+      }
+    }
+
+    def parse(args: Array[String]): Args = {
+      parseInner(Args(), args.flatMap(_.split('=')).toList)
+    }
+  }
+
+  def run(args: Args, sc: SparkContext): Unit = {
+
+    val kc = new KuduContext(args.masterAddrs)
+    val applicationId = sc.applicationId
+
+    val client: KuduClient = kc.syncClient
+    if (!client.tableExists(args.tableName)) {
+      val schema = getTableSchema
+      val options = getCreateTableOptions(schema, args.replicas,
+        args.rangePartitions, args.hashPartitions)
+      client.createTable(args.tableName, getTableSchema, options)
+    }
+
+    // Run the generate tasks
+    sc.makeRDD(0 until args.tasks, args.tasks)
+      .foreach(_ => generate(args, applicationId, kc))
+  }
+
+  def main(args: Array[String]): Unit = {
+    val conf = new SparkConf().setAppName("Integration Test Big Linked List Generator")
+    val sc = new SparkContext(conf)
+    run(Args.parse(args), sc)
+  }
+
+  /**
+    * Entry point for testing. SparkContext is a singleton,
+    * so tests must create and manage their own.
+    */
+  @VisibleForTesting
+  def testMain(args: Array[String], sc: SparkContext): Unit = {
+    run(Args.parse(args), sc)
+  }
+
+  def generate(args: Args,
+               applicationId: String,
+               kc: KuduContext): Unit = {
+    val taskContext = TaskContext.get()
+    val clientId = s"$applicationId-${taskContext.partitionId()}"
+
+    val rand = new Xoroshiro128PlusRandom()
+
+    val client:KuduClient = kc.syncClient
+
+    val table: KuduTable = client.openTable(args.tableName)
+    val session: KuduSession = client.newSession()
+    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+    try {
+      for (_ <- 0 until args.lists) {
+        val start = System.nanoTime()
+        insertList(clientId, args, table, session, rand)
+        LOG.info(s"$clientId inserted ${args.nodes} node linked list in {}",
+                 nanosToHuman(System.nanoTime() - start))
+      }
+    } finally {
+      session.close()
+    }
+  }
+
+  def insertList(clientId: String,
+                 args: Args,
+                 table: KuduTable,
+                 session: KuduSession,
+                 rand: Xoroshiro128PlusRandom): Unit = {
+
+    // Write the head node to the table.
+    val headKeyOne = rand.nextLong()
+    val headKeyTwo = rand.nextLong()
+
+    {
+      val insert = table.newInsert()
+      insert.getRow.addLong(COLUMN_KEY_ONE_IDX, headKeyOne)
+      insert.getRow.addLong(COLUMN_KEY_TWO_IDX, headKeyTwo)
+      insert.getRow.addLong(COLUMN_ROW_ID_IDX, 0)
+      insert.getRow.addString(COLUMN_CLIENT_IDX, clientId)
+      insert.getRow.addInt(COLUMN_UPDATE_COUNT_IDX, 0)
+      session.apply(insert)
+    }
+
+    // Write the rest of the list nodes.
+    var prevKeyOne = headKeyOne
+    var prevKeyTwo = headKeyTwo
+    for (rowIdx <- 1 until args.nodes) {
+      val keyOne = rand.nextLong()
+      val keyTwo = rand.nextLong()
+      val insert = table.newInsert()
+      insert.getRow.addLong(COLUMN_KEY_ONE_IDX, keyOne)
+      insert.getRow.addLong(COLUMN_KEY_TWO_IDX, keyTwo)
+      insert.getRow.addLong(COLUMN_PREV_ONE_IDX, prevKeyOne)
+      insert.getRow.addLong(COLUMN_PREV_TWO_IDX, prevKeyTwo)
+      insert.getRow.addLong(COLUMN_ROW_ID_IDX, rowIdx)
+      insert.getRow.addString(COLUMN_CLIENT_IDX, clientId)
+      insert.getRow.addInt(COLUMN_UPDATE_COUNT_IDX, 0)
+      session.apply(insert)
+      prevKeyOne = keyOne
+      prevKeyTwo = keyTwo
+    }
+
+    // Update the head node's previous pointers to point to the last node.
+    {
+      val update = table.newUpdate()
+      update.getRow.addLong(COLUMN_KEY_ONE_IDX, headKeyOne)
+      update.getRow.addLong(COLUMN_KEY_TWO_IDX, headKeyTwo)
+      update.getRow.addLong(COLUMN_PREV_ONE_IDX, prevKeyOne)
+      update.getRow.addLong(COLUMN_PREV_TWO_IDX, prevKeyTwo)
+      session.apply(update)
+    }
+
+    session.flush()
+    val errors = session.getPendingErrors
+    if (errors.getRowErrors.length > 0) {
+      throw new RuntimeException(errors.getRowErrors
+        .map(_.getErrorStatus.toString)
+        .mkString("Row errors: [", ", ", "]"))
+    }
+  }
+}
+
+object Verifier {
+  import IntegrationTestBigLinkedList.{fail, parseLongFlag}
+
+  def usage: String =
+    s"""
+       | Usage: verify --nodes=<nodes> --master-addrs=<master-addrs> --table-name=<table-name>
+       |    where
+       |      nodes: number of nodes expected to be in the linked list table
+       |      master-addrs: comma separated addresses of Kudu master nodes, default: localhost
+       |      table-name: the name of the linked list table, default: $DEFAULT_TABLE_NAME
+     """.stripMargin
+
+  case class Args(nodes: Option[Long] = None,
+                  masterAddrs: String = "localhost",
+                  tableName: String = DEFAULT_TABLE_NAME)
+
+  object Args {
+    private def parseInner(options: Args, args: List[String]): Args = {
+      args match {
+        case Nil => options
+        case "--help" :: _ =>
+          System.err.println(usage)
+          sys.exit(0)
+        case flag :: Nil => fail(s"flag $flag has no value\n$usage")
+        case flag :: value :: tail =>
+          val newOptions = flag match {
+            case "--nodes" => options.copy(nodes = Some(parseLongFlag(flag, value)))
+            case "--master-addrs" => options.copy(masterAddrs = value)
+            case "--table-name" => options.copy(tableName = value)
+            case _ => fail(s"unknown verify flag $flag")
+          }
+          parseInner(newOptions, tail)
+      }
+    }
+
+    def parse(args: Array[String]): Args = {
+      parseInner(Args(), args.flatMap(_.split('=')).toList)
+    }
+  }
+
+  case class Counts(referenced: Long,
+                    unreferenced: Long,
+                    extrareferences: Long,
+                    undefined: Long)
+
+  /**
+    * Verifies the expected count against the count of nodes from a verification run.
+    * @param expected the expected node count
+    * @param counts the node counts returned by the verification job
+    * @return an error message, if the verification fails
+    */
+  def verify(expected: Option[Long], counts: Counts): Option[String] = {
+    if (expected.exists(_ != counts.referenced)) {
+      Some(s"Found ${counts.referenced} referenced nodes, " +
+           s"which does not match the expected count of ${expected.get} nodes")
+    } else if (counts.unreferenced > 0) {
+      Some(s"Found ${counts.unreferenced} unreferenced nodes")
+    } else if (counts.undefined > 0) {
+      Some(s"Found ${counts.undefined} undefined nodes")
+    } else if (counts.extrareferences > 0) {
+      Some(s"Found ${counts.extrareferences} extra-referenced nodes")
+    } else None
+  }
+
+  @VisibleForTesting
+  def run(args: Args, sc: SparkContext): Counts = {
+    import org.apache.kudu.spark.kudu._
+    val sql = new SQLContext(sc)
+
+    sql.read
+       .option("kudu.master", args.masterAddrs)
+       .option("kudu.table", args.tableName)
+       .kudu
+       .createOrReplaceTempView("nodes")
+
+    // Get a table of all nodes and their ref count
+    sql.sql(
+      s"""
+         | SELECT (SELECT COUNT(*)
+         |         FROM nodes t2
+         |         WHERE t1.$COLUMN_KEY_ONE = t2.$COLUMN_PREV_ONE
+         |           AND t1.$COLUMN_KEY_TWO = t2.$COLUMN_PREV_TWO) AS ref_count
+         | FROM nodes t1
+     """.stripMargin).createOrReplaceTempView("ref_counts")
+
+    // Compress the ref counts down to 0, 1, or 2.  0 Indicates no references,
+    // 1 indicates a single reference, and 2 indicates more than 1 reference.
+    sql.sql(
+      s"""
+         | SELECT (CASE WHEN ref_count > 1 THEN 2 ELSE ref_count END) as ref_count
+         | FROM ref_counts
+       """.stripMargin).createOrReplaceTempView("ref_counts")
+
+    // Aggregate the ref counts
+    sql.sql(
+      s"""
+         | SELECT ref_count, COUNT(*) as nodes
+         | FROM ref_counts
+         | GROUP BY ref_count
+       """.stripMargin).createOrReplaceTempView("ref_counts")
+
+    // Transform the ref count to a state.
+    sql.sql(
+      s"""
+         | SELECT CASE WHEN ref_count = 0 THEN "UNREFERENCED"
+         |             WHEN ref_count = 1 THEN "REFERENCED"
+         |             ELSE "EXTRAREFERENCES" END as state,
+         |        nodes
+         | FROM ref_counts
+       """.stripMargin).createOrReplaceTempView("ref_counts")
+
+    // Find all referenced but undefined nodes.
+    sql.sql(
+      s"""
+         | SELECT $COLUMN_CLIENT as list, "UNDEFINED" as state, COUNT(*) as nodes
+         | FROM nodes t1
+         | WHERE $COLUMN_PREV_ONE IS NOT NULL
+         |   AND $COLUMN_PREV_TWO IS NOT NULL
+         |   AND NOT EXISTS (
+         |       SELECT * FROM nodes t2
+         |       WHERE t1.$COLUMN_PREV_ONE = t2.$COLUMN_KEY_ONE
+         |         AND t1.$COLUMN_PREV_TWO = t2.$COLUMN_KEY_TWO)
+         | GROUP BY $COLUMN_CLIENT
+       """.stripMargin).createOrReplaceTempView("undefined")
+
+    // Combine the ref counts and undefined counts tables.
+    val rows = sql.sql(
+      s"""
+         | SELECT state, nodes FROM ref_counts
+         | UNION ALL
+         | SELECT state, nodes FROM undefined
+       """.stripMargin).collect()
+
+    // Extract the node counts for each state from the rows.
+    rows.foldLeft(Counts(0, 0, 0, 0))((counts, row) => {
+      val state = row.getString(0)
+      val count = row.getLong(1)
+      state match {
+        case "REFERENCED" => counts.copy(referenced = count)
+        case "UNREFERENCED" => counts.copy(unreferenced = count)
+        case "UNDEFINED" => counts.copy(undefined = count)
+        case "EXTRAREFERENCES" => counts.copy(extrareferences = count)
+      }
+    })
+  }
+
+  @VisibleForTesting
+  def testMain(arguments: Array[String], sc: SparkContext): Counts = {
+    run(Args.parse(arguments), sc)
+  }
+
+  def main(arguments: Array[String]): Unit = {
+    val args = Args.parse(arguments)
+    val conf = new SparkConf().setAppName("Integration Test Big Linked List Generator")
+    val sc = new SparkContext(conf)
+
+    val counts = run(Args.parse(arguments), sc)
+    verify(args.nodes, counts).map(fail)
+  }
+}
+
+object Looper {
+  import IntegrationTestBigLinkedList.{LOG, fail}
+  def main(args: Array[String]): Unit = {
+    val conf = new SparkConf().setAppName("Integration Test Big Linked List Looper")
+    val sc = new SparkContext(conf)
+
+    val genArgs = Generator.Args.parse(args)
+    var verifyArgs = Verifier.Args(masterAddrs = genArgs.masterAddrs,
+                                   tableName = genArgs.tableName)
+    val nodesPerLoop = genArgs.tasks * genArgs.lists * genArgs.nodes
+
+    for (n <- Stream.from(1)) {
+      Generator.run(genArgs, sc)
+      val count = Verifier.run(verifyArgs, sc)
+      val expected = verifyArgs.nodes.map(_ + nodesPerLoop)
+      Verifier.verify(expected, count).map(fail)
+      verifyArgs = verifyArgs.copy(nodes = Some(expected.getOrElse(nodesPerLoop)))
+      LOG.info("*************************************************")
+      LOG.info(s"Completed $n loops. Nodes verified: ${count.referenced}")
+      LOG.info("*************************************************")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-spark-tools/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/resources/log4j.properties b/java/kudu-spark-tools/src/test/resources/log4j.properties
new file mode 100644
index 0000000..535996c
--- /dev/null
+++ b/java/kudu-spark-tools/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger = WARN, out
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+log4j.logger.org.apache.kudu = INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedListTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedListTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedListTest.scala
new file mode 100644
index 0000000..9d1faa5
--- /dev/null
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedListTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.tools
+
+import org.apache.kudu.client.SessionConfiguration.FlushMode
+import org.apache.kudu.mapreduce.tools.BigLinkedListCommon._
+import org.apache.kudu.spark.kudu.TestContext
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FunSuite, Matchers}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class IntegrationTestBigLinkedListTest extends FunSuite with TestContext with Matchers {
+
+  test("Spark ITBLL") {
+    Generator.testMain(Array("--tasks=2",
+                             "--lists=2",
+                             "--nodes=10000",
+                             "--hash-partitions=2",
+                             "--range-partitions=2",
+                             "--replicas=1",
+                            s"--master-addrs=${miniCluster.getMasterAddresses}"),
+                       sc)
+
+    // Insert bad nodes in order to test the verifier:
+    //
+    //  (0, 0) points to an undefined node (-1, -1)
+    //  (0, 1) points to (0, 0)
+    //  (0, 2) points to (0, 0)
+    //
+    // Thus, (-1, -1) is undefined, (0, 0) is overreferenced,
+    // and (0, 1) and (0, 2) are unreferenced.
+
+    val table = kuduClient.openTable(DEFAULT_TABLE_NAME)
+    val session = kuduClient.newSession()
+    session.setFlushMode(FlushMode.MANUAL_FLUSH)
+
+    for ((key1, key2, prev1, prev2) <- List((0, 0, -1, -1),
+                                            (0, 1, 0, 0),
+                                            (0, 2, 0, 0))) {
+      val insert = table.newInsert()
+      insert.getRow.addLong(COLUMN_KEY_ONE_IDX, key1)
+      insert.getRow.addLong(COLUMN_KEY_TWO_IDX, key2)
+      insert.getRow.addLong(COLUMN_PREV_ONE_IDX, prev1)
+      insert.getRow.addLong(COLUMN_PREV_TWO_IDX, prev2)
+      insert.getRow.addLong(COLUMN_ROW_ID_IDX, -1)
+      insert.getRow.addString(COLUMN_CLIENT_IDX, "bad-nodes")
+      insert.getRow.addInt(COLUMN_UPDATE_COUNT_IDX, 0)
+      session.apply(insert)
+    }
+
+    for (response <- session.flush().asScala) {
+      if (response.hasRowError) {
+        // This might indicate that the generated linked lists overlapped with
+        // the bad nodes, but the odds are low.
+        throw new AssertionError(response.getRowError.getErrorStatus.toString)
+      }
+    }
+
+    val counts = Verifier.testMain(Array(s"--master-addrs=${miniCluster.getMasterAddresses}"), sc)
+    assertEquals(2 * 2 * 10000, counts.referenced)
+    assertEquals(1, counts.extrareferences)
+    assertEquals(2, counts.unreferenced)
+    assertEquals(1, counts.undefined)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-spark/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark/pom.xml b/java/kudu-spark/pom.xml
index 230ac95..31d2fdb 100644
--- a/java/kudu-spark/pom.xml
+++ b/java/kudu-spark/pom.xml
@@ -154,6 +154,7 @@
                     </execution>
                 </executions>
             </plugin>
+
             <!-- http://stackoverflow.com/questions/270445 -->
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
@@ -165,7 +166,7 @@
                         <goals><goal>add-source</goal></goals>
                         <configuration>
                             <sources>
-                                <source>${compat.src}</source>
+                                <source>${kudu-spark.compat.src}</source>
                             </sources>
                         </configuration>
                     </execution>
@@ -173,30 +174,4 @@
             </plugin>
         </plugins>
     </build>
-
-    <profiles>
-        <profile>
-            <id>spark_2.10</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-            <properties>
-                <spark.version>1.6.1</spark.version>
-                <spark.version.label>spark</spark.version.label>
-                <scala.version>2.10.4</scala.version>
-                <scala.binary.version>2.10</scala.binary.version>
-                <compat.src>src/main/spark1</compat.src>
-            </properties>
-        </profile>
-        <profile>
-            <id>spark2_2.11</id>
-            <properties>
-                <spark.version>2.0.1</spark.version>
-                <spark.version.label>spark2</spark.version.label>
-                <scala.version>2.11.6</scala.version>
-                <scala.binary.version>2.11</scala.binary.version>
-                <compat.src>src/main/spark2</compat.src>
-            </properties>
-        </profile>
-    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index f7875b5..4faaa79 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -80,8 +80,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
     val kuduRelation = createRelation(sqlContext, parameters)
     mode match {
       case SaveMode.Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
-      case _ => throw new UnsupportedOperationException(
-        "Currently, only Append is supported")
+      case _ => throw new UnsupportedOperationException("Currently, only Append is supported")
     }
 
     kuduRelation

http://git-wip-us.apache.org/repos/asf/kudu/blob/94899aad/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 5091b0f..1a82ef2 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -72,6 +72,12 @@
         <protobuf.version>2.6.1</protobuf.version>
         <slf4j.version>1.7.12</slf4j.version>
 
+        <!-- Scala Library dependencies -->
+        <spark1.version>1.6.1</spark1.version>
+        <spark2.version>2.0.1</spark2.version>
+        <scala-2.10.version>2.10.4</scala-2.10.version>
+        <scala-2.11.version>2.11.6</scala-2.11.version>
+
         <!-- Misc variables -->
         <testdata.dir>target/testdata</testdata.dir>
         <testArgLine>-enableassertions -Xmx1900m
@@ -272,42 +278,74 @@
     </build>
 
     <pluginRepositories>
-      <!-- For maven-protoc-plugin -->
-      <pluginRepository>
-        <id>protoc-plugin</id>
-        <url>http://maven.davidtrott.com/repository</url>
-        <name>Protoc Plugin Repository</name>
-      </pluginRepository>
-      <!-- For schema-validator-maven-plugin -->
-      <pluginRepository>
-        <id>schema-validator-maven-plugin</id>
-        <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
-        <name>Cloudera Plugin Repositories</name>
-      </pluginRepository>
+        <!-- For maven-protoc-plugin -->
+        <pluginRepository>
+            <id>protoc-plugin</id>
+            <url>http://maven.davidtrott.com/repository</url>
+            <name>Protoc Plugin Repository</name>
+        </pluginRepository>
+        <!-- For schema-validator-maven-plugin -->
+        <pluginRepository>
+            <id>schema-validator-maven-plugin</id>
+            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+            <name>Cloudera Plugin Repositories</name>
+        </pluginRepository>
     </pluginRepositories>
 
-  <repositories>
-    <repository>
-      <id>cdh.repo</id>
-      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
-      <name>Cloudera Repositories</name>
-      <snapshots>
-        <enabled>false</enabled>
-      </snapshots>
-    </repository>
- </repositories>
+    <repositories>
+        <repository>
+            <id>cdh.repo</id>
+            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+            <name>Cloudera Repositories</name>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <profiles>
+        <!-- Build the jepsen test for Kudu.
+
+             Disabled by default since Java 8 and maven >= 3.3.6 is required to
+             build the kudu-jepsen artifact.  To enable, add '-Pjepsen'
+             to the maven command line. -->
+        <profile>
+            <id>jepsen</id>
+            <modules>
+                <module>kudu-jepsen</module>
+            </modules>
+        </profile>
 
-  <profiles>
-    <!-- Build the jepsen test for Kudu.
+        <!-- Manage Scala and Spark versions.
 
-         Disabled by default since Java 8 and maven >= 3.3.6 is required to
-         build the kudu-jepsen artifact.  To enable, add '-Pjepsen'
-         to the maven command line. -->
-    <profile>
-      <id>jepsen</id>
-      <modules>
-        <module>kudu-jepsen</module>
-      </modules>
-    </profile>
-  </profiles>
+             Kudu supports integration with Spark2/Scala2.11 *and* Spark1/Scala2.10
+             with the same module, so we use profiles to control which mode to compile in. -->
+        <profile>
+            <id>spark_2.10</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <spark.version>${spark1.version}</spark.version>
+                <spark.version.label>spark</spark.version.label>
+                <scala.version>${scala-2.10.version}</scala.version>
+                <scala.binary.version>2.10</scala.binary.version>
+                <kudu-spark.compat.src>src/main/spark1</kudu-spark.compat.src>
+            </properties>
+        </profile>
+        <profile>
+            <id>spark2_2.11</id>
+            <properties>
+                <spark.version>${spark2.version}</spark.version>
+                <spark.version.label>spark2</spark.version.label>
+                <scala.version>${scala-2.11.version}</scala.version>
+                <scala.binary.version>2.11</scala.binary.version>
+                <kudu-spark.compat.src>src/main/spark2</kudu-spark.compat.src>
+            </properties>
+            <modules>
+                <!-- kudu-spark-tools is only compatible with Spark2 -->
+                <module>kudu-spark-tools</module>
+            </modules>
+        </profile>
+    </profiles>
 </project>