You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:09 UTC

[04/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
new file mode 100644
index 0000000..d732b71
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.CreateTableOptions;
+import org.kududb.client.KuduTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class KuduSinkTest extends BaseKuduTest {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    LOG.info("Creating new table...");
+
+    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+                                .setNumReplicas(1);
+    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+
+    LOG.info("Created new table.");
+
+    return table;
+  }
+
+  @Test
+  public void testMandatoryParameters() {
+    LOG.info("Testing mandatory parameters...");
+
+    KuduSink sink = new KuduSink(syncClient);
+
+    HashMap<String, String> parameters = new HashMap<>();
+    Context context = new Context(parameters);
+    try {
+      Configurables.configure(sink, context);
+      Assert.fail("Should have failed due to missing properties");
+    } catch (NullPointerException npe) {
+        //good
+    }
+
+    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
+    context = new Context(parameters);
+    try {
+      Configurables.configure(sink, context);
+      Assert.fail("Should have failed due to missing properties");
+    } catch (NullPointerException npe) {
+        //good
+    }
+
+    LOG.info("Testing mandatory parameters finished successfully.");
+  }
+
+  @Test(expected = FlumeException.class)
+  public void testMissingTable() throws Exception {
+    LOG.info("Testing missing table...");
+
+    KuduSink sink = createSink("missingTable");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    LOG.info("Testing missing table finished successfully.");
+  }
+
+  @Test
+  public void testEmptyChannelWithDefaults() throws Exception {
+    testEventsWithDefaults(0);
+  }
+
+  @Test
+  public void testOneEventWithDefaults() throws Exception {
+    testEventsWithDefaults(1);
+  }
+
+  @Test
+  public void testThreeEventsWithDefaults() throws Exception {
+    testEventsWithDefaults(3);
+  }
+
+  @Test
+  public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
+    doTestDuplicateRows(true);
+  }
+
+  @Test
+  public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
+    doTestDuplicateRows(false);
+  }
+
+  private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
+    KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
+    String tableName = table.getName();
+    Context sinkContext = new Context();
+    sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+                    Boolean.toString(ignoreDuplicateRows));
+    KuduSink sink = createSink(tableName, sinkContext);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < 2; i++) {
+      Event e = EventBuilder.withBody("key-0", Charsets.UTF_8); // Duplicate keys.
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = sink.process();
+      if (!ignoreDuplicateRows) {
+        fail("Incorrectly ignored duplicate rows!");
+      }
+      assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
+    } catch (EventDeliveryException e) {
+      if (ignoreDuplicateRows) {
+        throw new AssertionError("Failed to ignore duplicate rows!", e);
+      } else {
+        LOG.info("Correctly did not ignore duplicate rows", e);
+        return;
+      }
+    }
+
+    // We only get here if the process() succeeded.
+    try {
+      List<String> rows = scanTableToStrings(table);
+      assertEquals("1 row expected", 1, rows.size());
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+
+    LOG.info("Testing duplicate events finished successfully.");
+  }
+
+  private void testEventsWithDefaults(int eventCount) throws Exception {
+    LOG.info("Testing {} events...", eventCount);
+
+    KuduTable table = createNewTable("test" + eventCount + "events");
+    String tableName = table.getName();
+    KuduSink sink = createSink(tableName);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < eventCount; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    if (eventCount == 0) {
+      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
+    } else {
+      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+    }
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+
+  private KuduSink createSink(String tableName) {
+    return createSink(tableName, new Context());
+  }
+
+  private KuduSink createSink(String tableName, Context ctx) {
+    LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+    KuduSink sink = new KuduSink(syncClient);
+    HashMap<String, String> parameters = new HashMap<>();
+    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+    parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
+    Context context = new Context(parameters);
+    context.putAll(ctx.getParameters());
+    Configurables.configure(sink, context);
+
+    LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+    return sink;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
deleted file mode 100644
index d732b71..0000000
--- a/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kududb.flume.sink;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Assert;
-import org.junit.Test;
-import org.kududb.ColumnSchema;
-import org.kududb.Schema;
-import org.kududb.Type;
-import org.kududb.client.BaseKuduTest;
-import org.kududb.client.CreateTableOptions;
-import org.kududb.client.KuduTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KuduSinkTest extends BaseKuduTest {
-  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    LOG.info("Creating new table...");
-
-    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
-                                .setNumReplicas(1);
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
-
-    LOG.info("Created new table.");
-
-    return table;
-  }
-
-  @Test
-  public void testMandatoryParameters() {
-    LOG.info("Testing mandatory parameters...");
-
-    KuduSink sink = new KuduSink(syncClient);
-
-    HashMap<String, String> parameters = new HashMap<>();
-    Context context = new Context(parameters);
-    try {
-      Configurables.configure(sink, context);
-      Assert.fail("Should have failed due to missing properties");
-    } catch (NullPointerException npe) {
-        //good
-    }
-
-    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
-    context = new Context(parameters);
-    try {
-      Configurables.configure(sink, context);
-      Assert.fail("Should have failed due to missing properties");
-    } catch (NullPointerException npe) {
-        //good
-    }
-
-    LOG.info("Testing mandatory parameters finished successfully.");
-  }
-
-  @Test(expected = FlumeException.class)
-  public void testMissingTable() throws Exception {
-    LOG.info("Testing missing table...");
-
-    KuduSink sink = createSink("missingTable");
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    LOG.info("Testing missing table finished successfully.");
-  }
-
-  @Test
-  public void testEmptyChannelWithDefaults() throws Exception {
-    testEventsWithDefaults(0);
-  }
-
-  @Test
-  public void testOneEventWithDefaults() throws Exception {
-    testEventsWithDefaults(1);
-  }
-
-  @Test
-  public void testThreeEventsWithDefaults() throws Exception {
-    testEventsWithDefaults(3);
-  }
-
-  @Test
-  public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
-    doTestDuplicateRows(true);
-  }
-
-  @Test
-  public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
-    doTestDuplicateRows(false);
-  }
-
-  private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
-    KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
-    String tableName = table.getName();
-    Context sinkContext = new Context();
-    sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
-                    Boolean.toString(ignoreDuplicateRows));
-    KuduSink sink = createSink(tableName, sinkContext);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    for (int i = 0; i < 2; i++) {
-      Event e = EventBuilder.withBody("key-0", Charsets.UTF_8); // Duplicate keys.
-      channel.put(e);
-    }
-
-    tx.commit();
-    tx.close();
-
-    try {
-      Sink.Status status = sink.process();
-      if (!ignoreDuplicateRows) {
-        fail("Incorrectly ignored duplicate rows!");
-      }
-      assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
-    } catch (EventDeliveryException e) {
-      if (ignoreDuplicateRows) {
-        throw new AssertionError("Failed to ignore duplicate rows!", e);
-      } else {
-        LOG.info("Correctly did not ignore duplicate rows", e);
-        return;
-      }
-    }
-
-    // We only get here if the process() succeeded.
-    try {
-      List<String> rows = scanTableToStrings(table);
-      assertEquals("1 row expected", 1, rows.size());
-    } catch (Exception e) {
-      Throwables.propagate(e);
-    }
-
-    LOG.info("Testing duplicate events finished successfully.");
-  }
-
-  private void testEventsWithDefaults(int eventCount) throws Exception {
-    LOG.info("Testing {} events...", eventCount);
-
-    KuduTable table = createNewTable("test" + eventCount + "events");
-    String tableName = table.getName();
-    KuduSink sink = createSink(tableName);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    for (int i = 0; i < eventCount; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
-      channel.put(e);
-    }
-
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
-    } else {
-      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
-    }
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
-    for (int i = 0; i < eventCount; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing {} events finished successfully.", eventCount);
-  }
-
-  private KuduSink createSink(String tableName) {
-    return createSink(tableName, new Context());
-  }
-
-  private KuduSink createSink(String tableName, Context ctx) {
-    LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
-    parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-
-    LOG.info("Created Kudu sink for '{}' table.", tableName);
-
-    return sink;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
new file mode 100644
index 0000000..05c18f2
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.apache.hadoop.conf.Configuration;
+import org.kududb.client.KuduClient;
+
+/**
+ * Utility class that manages common configurations to all MR jobs. For example,
+ * any job that uses {#KuduTableMapReduceUtil} to setup an input or output format
+ * and that has parsed the command line arguments with
+ * {@link org.apache.hadoop.util.GenericOptionsParser} can simply be passed:
+ * <code>
+ * -Dmaster.address=ADDRESS
+ * </code>
+ * in order to specify where the master is.
+ * Use {@link CommandLineParser#getHelpSnippet()} to provide usage text for the configurations
+ * managed by this class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class CommandLineParser {
+  private final Configuration conf;
+  public static final String MASTER_ADDRESSES_KEY = "kudu.master.addresses";
+  public static final String MASTER_ADDRESSES_DEFAULT = "127.0.0.1";
+  public static final String OPERATION_TIMEOUT_MS_KEY = "kudu.operation.timeout.ms";
+  public static final long OPERATION_TIMEOUT_MS_DEFAULT =
+      AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+  public static final String ADMIN_OPERATION_TIMEOUT_MS_KEY = "kudu.admin.operation.timeout.ms";
+  public static final String SOCKET_READ_TIMEOUT_MS_KEY = "kudu.socket.read.timeout.ms";
+  public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT =
+      AsyncKuduClient.DEFAULT_SOCKET_READ_TIMEOUT_MS;
+  public static final String NUM_REPLICAS_KEY = "kudu.num.replicas";
+  public static final int NUM_REPLICAS_DEFAULT = 3;
+
+  /**
+   * Constructor that uses a Configuration that has already been through
+   * {@link org.apache.hadoop.util.GenericOptionsParser}'s command line parsing.
+   * @param conf the configuration from which job configurations will be extracted
+   */
+  public CommandLineParser(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the configured master's config.
+   * @return a string that contains the passed config, or the default value
+   */
+  public String getMasterAddresses() {
+    return conf.get(MASTER_ADDRESSES_KEY, MASTER_ADDRESSES_DEFAULT);
+  }
+
+  /**
+   * Get the configured timeout for operations on sessions and scanners.
+   * @return a long that represents the passed timeout, or the default value
+   */
+  public long getOperationTimeoutMs() {
+    return conf.getLong(OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
+  }
+
+  /**
+   * Get the configured timeout for admin operations.
+   * @return a long that represents the passed timeout, or the default value
+   */
+  public long getAdminOperationTimeoutMs() {
+    return conf.getLong(ADMIN_OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
+  }
+
+  /**
+   * Get the configured timeout for socket reads.
+   * @return a long that represents the passed timeout, or the default value
+   */
+  public long getSocketReadTimeoutMs() {
+    return conf.getLong(SOCKET_READ_TIMEOUT_MS_KEY, SOCKET_READ_TIMEOUT_MS_DEFAULT);
+  }
+
+  /**
+   * Get the number of replicas to use when configuring a new table.
+   * @return an int that represents the passed number of replicas to use, or the default value.
+   */
+  public int getNumReplicas() {
+    return conf.getInt(NUM_REPLICAS_KEY, NUM_REPLICAS_DEFAULT);
+  }
+
+  /**
+   * Get an async client connected to the configured Master(s).
+   * @return an async kudu client
+   */
+  public AsyncKuduClient getAsyncClient() {
+    return new AsyncKuduClient.AsyncKuduClientBuilder(getMasterAddresses())
+        .defaultOperationTimeoutMs(getOperationTimeoutMs())
+        .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
+        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
+        .build();
+  }
+
+  /**
+   * Get a client connected to the configured Master(s).
+   * @return a kudu client
+   */
+  public KuduClient getClient() {
+    return new KuduClient.KuduClientBuilder(getMasterAddresses())
+        .defaultOperationTimeoutMs(getOperationTimeoutMs())
+        .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
+        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
+        .build();
+  }
+
+  /**
+   * This method returns a single multi-line string that contains the help snippet to append to
+   * the tail of a usage() or help() type of method.
+   * @return a string with all the available configurations and their defaults
+   */
+  public static String getHelpSnippet() {
+    return "\nAdditionally, the following options are available:" +
+      "  -D" + OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for read and write " +
+          "operations, defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
+      "  -D" + ADMIN_OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for admin operations " +
+        ", defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
+      "  -D" + SOCKET_READ_TIMEOUT_MS_KEY + "=TIME - timeout for socket reads " +
+        ", defaults to " + SOCKET_READ_TIMEOUT_MS_DEFAULT + " \n"+
+      "  -D" + MASTER_ADDRESSES_KEY + "=ADDRESSES - addresses to reach the Masters, " +
+        "defaults to " + MASTER_ADDRESSES_DEFAULT + " which is usually wrong.\n" +
+      "  -D " + NUM_REPLICAS_KEY + "=NUM - number of replicas to use when configuring a new " +
+        "table, defaults to " + NUM_REPLICAS_DEFAULT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java
new file mode 100644
index 0000000..57593db
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import com.google.common.base.Preconditions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * Finds the Jar for a class. If the class is in a directory in the
+ * classpath, it creates a Jar on the fly with the contents of the directory
+ * and returns the path to that Jar. If a Jar is created, it is created in
+ * the system temporary directory.
+ *
+ * This file was forked from hbase/branches/master@4ce6f48.
+ */
+public class JarFinder {
+
+  private static void copyToZipStream(File file, ZipEntry entry,
+                                      ZipOutputStream zos) throws IOException {
+    InputStream is = new FileInputStream(file);
+    try {
+      zos.putNextEntry(entry);
+      byte[] arr = new byte[4096];
+      int read = is.read(arr);
+      while (read > -1) {
+        zos.write(arr, 0, read);
+        read = is.read(arr);
+      }
+    } finally {
+      try {
+        is.close();
+      } finally {
+        zos.closeEntry();
+      }
+    }
+  }
+
+  public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
+    throws IOException {
+    Preconditions.checkNotNull(relativePath, "relativePath");
+    Preconditions.checkNotNull(zos, "zos");
+
+    // by JAR spec, if there is a manifest, it must be the first entry in the
+    // ZIP.
+    File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+    ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
+    if (!manifestFile.exists()) {
+      zos.putNextEntry(manifestEntry);
+      new Manifest().write(new BufferedOutputStream(zos));
+      zos.closeEntry();
+    } else {
+      copyToZipStream(manifestFile, manifestEntry, zos);
+    }
+    zos.closeEntry();
+    zipDir(dir, relativePath, zos, true);
+    zos.close();
+  }
+
+  private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
+                             boolean start) throws IOException {
+    String[] dirList = dir.list();
+    for (String aDirList : dirList) {
+      File f = new File(dir, aDirList);
+      if (!f.isHidden()) {
+        if (f.isDirectory()) {
+          if (!start) {
+            ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
+            zos.putNextEntry(dirEntry);
+            zos.closeEntry();
+          }
+          String filePath = f.getPath();
+          File file = new File(filePath);
+          zipDir(file, relativePath + f.getName() + "/", zos, false);
+        }
+        else {
+          String path = relativePath + f.getName();
+          if (!path.equals(JarFile.MANIFEST_NAME)) {
+            ZipEntry anEntry = new ZipEntry(path);
+            copyToZipStream(f, anEntry, zos);
+          }
+        }
+      }
+    }
+  }
+
+  private static void createJar(File dir, File jarFile) throws IOException {
+    Preconditions.checkNotNull(dir, "dir");
+    Preconditions.checkNotNull(jarFile, "jarFile");
+    File jarDir = jarFile.getParentFile();
+    if (!jarDir.exists()) {
+      if (!jarDir.mkdirs()) {
+        throw new IOException(MessageFormat.format("could not create dir [{0}]",
+          jarDir));
+      }
+    }
+    JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile));
+    jarDir(dir, "", zos);
+  }
+
+  /**
+   * Returns the full path to the Jar containing the class. It always returns a
+   * JAR.
+   *
+   * @param klass class.
+   *
+   * @return path to the Jar containing the class.
+   */
+  public static String getJar(Class klass) {
+    Preconditions.checkNotNull(klass, "klass");
+    ClassLoader loader = klass.getClassLoader();
+    if (loader != null) {
+      String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
+      try {
+        for (Enumeration itr = loader.getResources(class_file);
+             itr.hasMoreElements(); ) {
+          URL url = (URL) itr.nextElement();
+          String path = url.getPath();
+          if (path.startsWith("file:")) {
+            path = path.substring("file:".length());
+          }
+          path = URLDecoder.decode(path, "UTF-8");
+          if ("jar".equals(url.getProtocol())) {
+            path = URLDecoder.decode(path, "UTF-8");
+            return path.replaceAll("!.*$", "");
+          }
+          else if ("file".equals(url.getProtocol())) {
+            String klassName = klass.getName();
+            klassName = klassName.replace(".", "/") + ".class";
+            path = path.substring(0, path.length() - klassName.length());
+            File baseDir = new File(path);
+            File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
+            testDir = testDir.getAbsoluteFile();
+            if (!testDir.exists()) {
+              testDir.mkdirs();
+            }
+            File tempJar = File.createTempFile("hadoop-", "", testDir);
+            tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+            tempJar.deleteOnExit();
+            createJar(baseDir, tempJar);
+            return tempJar.getAbsolutePath();
+          }
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
new file mode 100644
index 0000000..25235cb
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -0,0 +1,444 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.kududb.mapreduce;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.naming.NamingException;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
+import org.kududb.Common;
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.Bytes;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.KuduTable;
+import org.kududb.client.LocatedTablet;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.KuduScanToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This input format generates one split per tablet and the only location for each split is that
+ * tablet's leader.
+ * </p>
+ *
+ * <p>
+ * Hadoop doesn't have the concept of "closing" the input format so in order to release the
+ * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that
+ * the object won't be used again and the AsyncKuduClient is shut down.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
+    implements Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
+
+  /** Job parameter that specifies the input table. */
+  static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
+
+  /** Job parameter that specifies if the scanner should cache blocks or not (default: false). */
+  static final String SCAN_CACHE_BLOCKS = "kudu.mapreduce.input.scan.cache.blocks";
+
+  /** Job parameter that specifies where the masters are. */
+  static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.address";
+
+  /** Job parameter that specifies how long we wait for operations to complete (default: 10s). */
+  static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
+
+  /** Job parameter that specifies the address for the name server. */
+  static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
+
+  /** Job parameter that specifies the encoded column predicates (may be empty). */
+  static final String ENCODED_PREDICATES_KEY =
+      "kudu.mapreduce.encoded.predicates";
+
+  /**
+   * Job parameter that specifies the column projection as a comma-separated list of column names.
+   *
+   * Not specifying this at all (i.e. setting to null) or setting to the special string
+   * '*' means to project all columns.
+   *
+   * Specifying the empty string means to project no columns (i.e just count the rows).
+   */
+  static final String COLUMN_PROJECTION_KEY = "kudu.mapreduce.column.projection";
+
+  /**
+   * The reverse DNS lookup cache mapping: address from Kudu => hostname for Hadoop. This cache is
+   * used in order to not do DNS lookups multiple times for each tablet server.
+   */
+  private final Map<String, String> reverseDNSCacheMap = new HashMap<>();
+
+  private Configuration conf;
+  private KuduClient client;
+  private KuduTable table;
+  private long operationTimeoutMs;
+  private String nameServer;
+  private boolean cacheBlocks;
+  private List<String> projectedCols;
+  private List<KuduPredicate> predicates;
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    try {
+      if (table == null) {
+        throw new IOException("No table was provided");
+      }
+
+      KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
+                                                      .setProjectedColumnNames(projectedCols)
+                                                      .cacheBlocks(cacheBlocks)
+                                                      .setTimeout(operationTimeoutMs);
+      for (KuduPredicate predicate : predicates) {
+        tokenBuilder.addPredicate(predicate);
+      }
+      List<KuduScanToken> tokens = tokenBuilder.build();
+
+      List<InputSplit> splits = new ArrayList<>(tokens.size());
+      for (KuduScanToken token : tokens) {
+        List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+        for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+          locations.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
+        }
+        splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
+      }
+      return splits;
+    } finally {
+      shutdownClient();
+    }
+  }
+
+  private void shutdownClient() throws IOException {
+    try {
+      client.shutdown();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * This method might seem alien, but we do this in order to resolve the hostnames the same way
+   * Hadoop does. This ensures we get locality if Kudu is running along MR/YARN.
+   * @param host hostname we got from the master
+   * @param port port we got from the master
+   * @return reverse DNS'd address
+   */
+  private String reverseDNS(String host, Integer port) {
+    String location = this.reverseDNSCacheMap.get(host);
+    if (location != null) {
+      return location;
+    }
+    // The below InetSocketAddress creation does a name resolution.
+    InetSocketAddress isa = new InetSocketAddress(host, port);
+    if (isa.isUnresolved()) {
+      LOG.warn("Failed address resolve for: " + isa);
+    }
+    InetAddress tabletInetAddress = isa.getAddress();
+    try {
+      location = domainNamePointerToHostName(
+          DNS.reverseDns(tabletInetAddress, this.nameServer));
+      this.reverseDNSCacheMap.put(host, location);
+    } catch (NamingException e) {
+      LOG.warn("Cannot resolve the host name for " + tabletInetAddress + " because of " + e);
+      location = host;
+    }
+    return location;
+  }
+
+  @Override
+  public RecordReader<NullWritable, RowResult> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    return new TableRecordReader();
+  }
+
+  @Override
+  public void setConf(Configuration entries) {
+    this.conf = new Configuration(entries);
+
+    String tableName = conf.get(INPUT_TABLE_KEY);
+    String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
+    this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
+                                           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+    this.nameServer = conf.get(NAME_SERVER_KEY);
+    this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
+
+    this.client = new KuduClient.KuduClientBuilder(masterAddresses)
+                                .defaultOperationTimeoutMs(operationTimeoutMs)
+                                .build();
+    try {
+      this.table = client.openTable(tableName);
+    } catch (Exception ex) {
+      throw new RuntimeException("Could not obtain the table from the master, " +
+          "is the master running and is this table created? tablename=" + tableName + " and " +
+          "master address= " + masterAddresses, ex);
+    }
+
+    String projectionConfig = conf.get(COLUMN_PROJECTION_KEY);
+    if (projectionConfig == null || projectionConfig.equals("*")) {
+      this.projectedCols = null; // project the whole table
+    } else if ("".equals(projectionConfig)) {
+      this.projectedCols = new ArrayList<>();
+    } else {
+      this.projectedCols = Lists.newArrayList(Splitter.on(',').split(projectionConfig));
+
+      // Verify that the column names are valid -- better to fail with an exception
+      // before we submit the job.
+      Schema tableSchema = table.getSchema();
+      for (String columnName : projectedCols) {
+        if (tableSchema.getColumn(columnName) == null) {
+          throw new IllegalArgumentException("Unknown column " + columnName);
+        }
+      }
+    }
+
+    this.predicates = new ArrayList<>();
+    try {
+      InputStream is =
+          new ByteArrayInputStream(Base64.decodeBase64(conf.get(ENCODED_PREDICATES_KEY, "")));
+      while (is.available() > 0) {
+        this.predicates.add(KuduPredicate.fromPB(table.getSchema(),
+                                                 Common.ColumnPredicatePB.parseDelimitedFrom(is)));
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("unable to deserialize predicates from the configuration", e);
+    }
+  }
+
+  /**
+   * Given a PTR string generated via reverse DNS lookup, return everything
+   * except the trailing period. Example for host.example.com., return
+   * host.example.com
+   * @param dnPtr a domain name pointer (PTR) string.
+   * @return Sanitized hostname with last period stripped off.
+   *
+   */
+  private static String domainNamePointerToHostName(String dnPtr) {
+    if (dnPtr == null)
+      return null;
+    return dnPtr.endsWith(".") ? dnPtr.substring(0, dnPtr.length() - 1) : dnPtr;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
+
+    /** The scan token that the split will use to scan the Kudu table. */
+    private byte[] scanToken;
+
+    /** The start partition key of the scan. Used for sorting splits. */
+    private byte[] partitionKey;
+
+    /** Tablet server locations which host the tablet to be scanned. */
+    private String[] locations;
+
+    public TableSplit() { } // Writable
+
+    public TableSplit(KuduScanToken token, String[] locations) throws IOException {
+      this.scanToken = token.serialize();
+      this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
+      this.locations = locations;
+    }
+
+    public byte[] getScanToken() {
+      return scanToken;
+    }
+
+    public byte[] getPartitionKey() {
+      return partitionKey;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      // TODO Guesstimate a size
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return locations;
+    }
+
+    @Override
+    public int compareTo(TableSplit other) {
+      return UnsignedBytes.lexicographicalComparator().compare(partitionKey, other.partitionKey);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      Bytes.writeByteArray(dataOutput, scanToken);
+      Bytes.writeByteArray(dataOutput, partitionKey);
+      dataOutput.writeInt(locations.length);
+      for (String location : locations) {
+        byte[] str = Bytes.fromString(location);
+        Bytes.writeByteArray(dataOutput, str);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      scanToken = Bytes.readByteArray(dataInput);
+      partitionKey = Bytes.readByteArray(dataInput);
+      locations = new String[dataInput.readInt()];
+      for (int i = 0; i < locations.length; i++) {
+        byte[] str = Bytes.readByteArray(dataInput);
+        locations[i] = Bytes.getString(str);
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      // We currently just care about the partition key since we're within the same table.
+      return Arrays.hashCode(partitionKey);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TableSplit that = (TableSplit) o;
+
+      return this.compareTo(that) == 0;
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+                    .add("partitionKey", Bytes.pretty(partitionKey))
+                    .add("locations", Arrays.toString(locations))
+                    .toString();
+    }
+  }
+
+  class TableRecordReader extends RecordReader<NullWritable, RowResult> {
+
+    private final NullWritable currentKey = NullWritable.get();
+    private RowResult currentValue;
+    private RowResultIterator iterator;
+    private KuduScanner scanner;
+    private TableSplit split;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+      if (!(inputSplit instanceof TableSplit)) {
+        throw new IllegalArgumentException("TableSplit is the only accepted input split");
+      }
+
+      split = (TableSplit) inputSplit;
+
+      try {
+        scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+
+      // Calling this now to set iterator.
+      tryRefreshIterator();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (!iterator.hasNext()) {
+        tryRefreshIterator();
+        if (!iterator.hasNext()) {
+          // Means we still have the same iterator, we're done
+          return false;
+        }
+      }
+      currentValue = iterator.next();
+      return true;
+    }
+
+    /**
+     * If the scanner has more rows, get a new iterator else don't do anything.
+     * @throws IOException
+     */
+    private void tryRefreshIterator() throws IOException {
+      if (!scanner.hasMoreRows()) {
+        return;
+      }
+      try {
+        iterator = scanner.nextRows();
+      } catch (Exception e) {
+        throw new IOException("Couldn't get scan data", e);
+      }
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return currentKey;
+    }
+
+    @Override
+    public RowResult getCurrentValue() throws IOException, InterruptedException {
+      return currentValue;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // TODO Guesstimate progress
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        scanner.close();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      shutdownClient();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
new file mode 100644
index 0000000..0b919d9
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
@@ -0,0 +1,541 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.kududb.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.StringUtils;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.ColumnRangePredicate;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+
+/**
+ * Utility class to setup MR jobs that use Kudu as an input and/or output.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableMapReduceUtil {
+  // Mostly lifted from HBase's TableMapReduceUtil
+
+  private static final Log LOG = LogFactory.getLog(KuduTableMapReduceUtil.class);
+
+  /**
+   * Doesn't need instantiation
+   */
+  private KuduTableMapReduceUtil() { }
+
+
+  /**
+   * Base class for MR I/O formats, contains the common configurations.
+   */
+  private static abstract class AbstractMapReduceConfigurator<S> {
+    protected final Job job;
+    protected final String table;
+
+    protected boolean addDependencies = true;
+
+    /**
+     * Constructor for the required fields to configure.
+     * @param job a job to configure
+     * @param table a string that contains the name of the table to read from
+     */
+    private AbstractMapReduceConfigurator(Job job, String table) {
+      this.job = job;
+      this.table = table;
+    }
+
+    /**
+     * Sets whether this job should add Kudu's dependencies to the distributed cache. Turned on
+     * by default.
+     * @param addDependencies a boolean that says if we should add the dependencies
+     * @return this instance
+     */
+    @SuppressWarnings("unchecked")
+    public S addDependencies(boolean addDependencies) {
+      this.addDependencies = addDependencies;
+      return (S) this;
+    }
+
+    /**
+     * Configures the job using the passed parameters.
+     * @throws IOException If addDependencies is enabled and a problem is encountered reading
+     * files on the filesystem
+     */
+    public abstract void configure() throws IOException;
+  }
+
+  /**
+   * Builder-like class that sets up the required configurations and classes to write to Kudu.
+   * <p>
+   * Use either child classes when configuring the table output format.
+   */
+  private static abstract class AbstractTableOutputFormatConfigurator
+      <S extends AbstractTableOutputFormatConfigurator<? super S>>
+      extends AbstractMapReduceConfigurator<S> {
+
+    protected String masterAddresses;
+    protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+
+    /**
+     * {@inheritDoc}
+     */
+    private AbstractTableOutputFormatConfigurator(Job job, String table) {
+      super(job, table);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void configure() throws IOException {
+      job.setOutputFormatClass(KuduTableOutputFormat.class);
+      job.setOutputKeyClass(NullWritable.class);
+      job.setOutputValueClass(Operation.class);
+
+      Configuration conf = job.getConfiguration();
+      conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
+      conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, table);
+      conf.setLong(KuduTableOutputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
+      if (addDependencies) {
+        addDependencyJars(job);
+      }
+    }
+  }
+
+  /**
+   * Builder-like class that sets up the required configurations and classes to read from Kudu.
+   * By default, block caching is disabled.
+   * <p>
+   * Use either child classes when configuring the table input format.
+   */
+  private static abstract class AbstractTableInputFormatConfigurator
+      <S extends AbstractTableInputFormatConfigurator<? super S>>
+      extends AbstractMapReduceConfigurator<S> {
+
+    protected String masterAddresses;
+    protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+    protected final String columnProjection;
+    protected boolean cacheBlocks;
+    protected List<KuduPredicate> predicates = new ArrayList<>();
+
+    /**
+     * Constructor for the required fields to configure.
+     * @param job a job to configure
+     * @param table a string that contains the name of the table to read from
+     * @param columnProjection a string containing a comma-separated list of columns to read.
+     *                         It can be null in which case we read empty rows
+     */
+    private AbstractTableInputFormatConfigurator(Job job, String table, String columnProjection) {
+      super(job, table);
+      this.columnProjection = columnProjection;
+    }
+
+    /**
+     * Sets the block caching configuration for the scanners. Turned off by default.
+     * @param cacheBlocks whether the job should use scanners that cache blocks.
+     * @return this instance
+     */
+    public S cacheBlocks(boolean cacheBlocks) {
+      this.cacheBlocks = cacheBlocks;
+      return (S) this;
+    }
+
+    /**
+     * Configures the job with all the passed parameters.
+     * @throws IOException If addDependencies is enabled and a problem is encountered reading
+     * files on the filesystem
+     */
+    public void configure() throws IOException {
+      job.setInputFormatClass(KuduTableInputFormat.class);
+
+      Configuration conf = job.getConfiguration();
+
+      conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
+      conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, table);
+      conf.setLong(KuduTableInputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
+      conf.setBoolean(KuduTableInputFormat.SCAN_CACHE_BLOCKS, cacheBlocks);
+
+      if (columnProjection != null) {
+        conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
+      }
+
+      conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, base64EncodePredicates(predicates));
+
+      if (addDependencies) {
+        addDependencyJars(job);
+      }
+    }
+  }
+
+  /**
+   * Returns the provided predicates as a Base64 encoded string.
+   * @param predicates the predicates to encode
+   * @return the encoded predicates
+   */
+  static String base64EncodePredicates(List<KuduPredicate> predicates) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    for (KuduPredicate predicate : predicates) {
+      predicate.toPB().writeDelimitedTo(baos);
+    }
+    return Base64.encodeBase64String(baos.toByteArray());
+  }
+
+
+  /**
+   * Table output format configurator to use to specify the parameters directly.
+   */
+  public static class TableOutputFormatConfigurator
+      extends AbstractTableOutputFormatConfigurator<TableOutputFormatConfigurator> {
+
+    /**
+     * Constructor for the required fields to configure.
+     * @param job a job to configure
+     * @param table a string that contains the name of the table to read from
+     * @param masterAddresses a comma-separated list of masters' hosts and ports
+     */
+    public TableOutputFormatConfigurator(Job job, String table, String masterAddresses) {
+      super(job, table);
+      this.masterAddresses = masterAddresses;
+    }
+
+    /**
+     * Sets the timeout for all the operations. The default is 10 seconds.
+     * @param operationTimeoutMs a long that represents the timeout for operations to complete,
+     *                           must be a positive value or 0
+     * @return this instance
+     * @throws IllegalArgumentException if the operation timeout is lower than 0
+     */
+    public TableOutputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
+      if (operationTimeoutMs < 0) {
+        throw new IllegalArgumentException("The operation timeout must be => 0, " +
+            "passed value is: " + operationTimeoutMs);
+      }
+      this.operationTimeoutMs = operationTimeoutMs;
+      return this;
+    }
+  }
+
+  /**
+   * Table output format that uses a {@link CommandLineParser} in order to set the
+   * master config and the operation timeout.
+   */
+  public static class TableOutputFormatConfiguratorWithCommandLineParser extends
+      AbstractTableOutputFormatConfigurator<TableOutputFormatConfiguratorWithCommandLineParser> {
+
+    /**
+     * {@inheritDoc}
+     */
+    public TableOutputFormatConfiguratorWithCommandLineParser(Job job, String table) {
+      super(job, table);
+      CommandLineParser parser = new CommandLineParser(job.getConfiguration());
+      this.masterAddresses = parser.getMasterAddresses();
+      this.operationTimeoutMs = parser.getOperationTimeoutMs();
+    }
+  }
+
+  /**
+   * Table input format configurator to use to specify the parameters directly.
+   */
+  public static class TableInputFormatConfigurator
+      extends AbstractTableInputFormatConfigurator<TableInputFormatConfigurator> {
+
+    /**
+     * Constructor for the required fields to configure.
+     * @param job a job to configure
+     * @param table a string that contains the name of the table to read from
+     * @param columnProjection a string containing a comma-separated list of columns to read.
+     *                         It can be null in which case we read empty rows
+     * @param masterAddresses a comma-separated list of masters' hosts and ports
+     */
+    public TableInputFormatConfigurator(Job job, String table, String columnProjection,
+                                        String masterAddresses) {
+      super(job, table, columnProjection);
+      this.masterAddresses = masterAddresses;
+    }
+
+    /**
+     * Sets the timeout for all the operations. The default is 10 seconds.
+     * @param operationTimeoutMs a long that represents the timeout for operations to complete,
+     *                           must be a positive value or 0
+     * @return this instance
+     * @throws IllegalArgumentException if the operation timeout is lower than 0
+     */
+    public TableInputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
+      if (operationTimeoutMs < 0) {
+        throw new IllegalArgumentException("The operation timeout must be => 0, " +
+            "passed value is: " + operationTimeoutMs);
+      }
+      this.operationTimeoutMs = operationTimeoutMs;
+      return this;
+    }
+
+    /**
+     * Adds a new predicate that will be pushed down to all the tablets.
+     * @param predicate a predicate to add
+     * @return this instance
+     * @deprecated use {@link #addPredicate}
+     */
+    @Deprecated
+    public TableInputFormatConfigurator addColumnRangePredicate(ColumnRangePredicate predicate) {
+      return addPredicate(predicate.toKuduPredicate());
+    }
+
+    /**
+     * Adds a new predicate that will be pushed down to all the tablets.
+     * @param predicate a predicate to add
+     * @return this instance
+     */
+    public TableInputFormatConfigurator addPredicate(KuduPredicate predicate) {
+      this.predicates.add(predicate);
+      return this;
+    }
+  }
+
+  /**
+   * Table input format that uses a {@link CommandLineParser} in order to set the
+   * master config and the operation timeout.
+   * This version cannot set column range predicates.
+   */
+  public static class TableInputFormatConfiguratorWithCommandLineParser extends
+      AbstractTableInputFormatConfigurator<TableInputFormatConfiguratorWithCommandLineParser> {
+
+    /**
+     * {@inheritDoc}
+     */
+    public TableInputFormatConfiguratorWithCommandLineParser(Job job,
+                                                             String table,
+                                                             String columnProjection) {
+      super(job, table, columnProjection);
+      CommandLineParser parser = new CommandLineParser(job.getConfiguration());
+      this.masterAddresses = parser.getMasterAddresses();
+      this.operationTimeoutMs = parser.getOperationTimeoutMs();
+    }
+  }
+
+  /**
+   * Use this method when setting up a task to get access to the KuduTable in order to create
+   * Inserts, Updates, and Deletes.
+   * @param context Map context
+   * @return The kudu table object as setup by the output format
+   */
+  @SuppressWarnings("rawtypes")
+  public static KuduTable getTableFromContext(TaskInputOutputContext context) {
+    String multitonKey = context.getConfiguration().get(KuduTableOutputFormat.MULTITON_KEY);
+    return KuduTableOutputFormat.getKuduTable(multitonKey);
+  }
+
+  /**
+   * Add the Kudu dependency jars as well as jars for any of the configured
+   * job classes to the job configuration, so that JobClient will ship them
+   * to the cluster and add them to the DistributedCache.
+   */
+  public static void addDependencyJars(Job job) throws IOException {
+    addKuduDependencyJars(job.getConfiguration());
+    try {
+      addDependencyJars(job.getConfiguration(),
+          // when making changes here, consider also mapred.TableMapReduceUtil
+          // pull job classes
+          job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(),
+          job.getInputFormatClass(),
+          job.getOutputKeyClass(),
+          job.getOutputValueClass(),
+          job.getOutputFormatClass(),
+          job.getPartitionerClass(),
+          job.getCombinerClass());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration
+   * such that JobClient will ship them to the cluster and add them to
+   * the DistributedCache.
+   */
+  public static void addDependencyJars(Configuration conf,
+                                       Class<?>... classes) throws IOException {
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    // Add jars that are already in the tmpjars variable
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    // add jars as we find them to a map of contents jar name so that we can avoid
+    // creating new jars for classes that have already been packaged.
+    Map<String, String> packagedClasses = new HashMap<String, String>();
+
+    // Add jars containing the specified classes
+    for (Class<?> clazz : classes) {
+      if (clazz == null) continue;
+
+      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+      if (path == null) {
+        LOG.warn("Could not find jar for class " + clazz +
+            " in order to ship it to the cluster.");
+        continue;
+      }
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path + " for class "
+            + clazz);
+        continue;
+      }
+      jars.add(path.toString());
+    }
+    if (jars.isEmpty()) return;
+
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+  }
+
+  /**
+   * Add Kudu and its dependencies (only) to the job configuration.
+   * <p>
+   * This is intended as a low-level API, facilitating code reuse between this
+   * class and its mapred counterpart. It also of use to external tools that
+   * need to build a MapReduce job that interacts with Kudu but want
+   * fine-grained control over the jars shipped to the cluster.
+   * </p>
+   * @param conf The Configuration object to extend with dependencies.
+   * @see KuduTableMapReduceUtil
+   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
+   */
+  public static void addKuduDependencyJars(Configuration conf) throws IOException {
+    addDependencyJars(conf,
+        // explicitly pull a class from each module
+        Operation.class,                      // kudu-client
+        KuduTableMapReduceUtil.class,   // kudu-mapreduce
+        // pull necessary dependencies
+        com.stumbleupon.async.Deferred.class);
+  }
+
+  /**
+   * Finds the Jar for a class or creates it if it doesn't exist. If the class
+   * is in a directory in the classpath, it creates a Jar on the fly with the
+   * contents of the directory and returns the path to that Jar. If a Jar is
+   * created, it is created in the system temporary directory. Otherwise,
+   * returns an existing jar that contains a class of the same name. Maintains
+   * a mapping from jar contents to the tmp jar created.
+   * @param my_class the class to find.
+   * @param fs the FileSystem with which to qualify the returned path.
+   * @param packagedClasses a map of class name to path.
+   * @return a jar file that contains the class.
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
+                                      Map<String, String> packagedClasses)
+      throws IOException {
+    // attempt to locate an existing jar for the class.
+    String jar = findContainingJar(my_class, packagedClasses);
+    if (null == jar || jar.isEmpty()) {
+      jar = JarFinder.getJar(my_class);
+      updateMap(jar, packagedClasses);
+    }
+
+    if (null == jar || jar.isEmpty()) {
+      return null;
+    }
+
+    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
+    return new Path(jar).makeQualified(fs);
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any. It will return
+   * a jar file, even if that is not the first thing on the class path that
+   * has a class with the same name. Looks first on the classpath and then in
+   * the <code>packagedClasses</code> map.
+   * @param my_class the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
+      throws IOException {
+    ClassLoader loader = my_class.getClassLoader();
+    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+    // first search the classpath
+    for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+      URL url = itr.nextElement();
+      if ("jar".equals(url.getProtocol())) {
+        String toReturn = url.getPath();
+        if (toReturn.startsWith("file:")) {
+          toReturn = toReturn.substring("file:".length());
+        }
+        // URLDecoder is a misnamed class, since it actually decodes
+        // x-www-form-urlencoded MIME type rather than actual
+        // URL encoding (which the file path has). Therefore it would
+        // decode +s to ' 's which is incorrect (spaces are actually
+        // either unencoded or encoded as "%20"). Replace +s first, so
+        // that they are kept sacred during the decoding process.
+        toReturn = toReturn.replaceAll("\\+", "%2B");
+        toReturn = URLDecoder.decode(toReturn, "UTF-8");
+        return toReturn.replaceAll("!.*$", "");
+      }
+    }
+
+    // now look in any jars we've packaged using JarFinder. Returns null when
+    // no jar is found.
+    return packagedClasses.get(class_file);
+  }
+
+  /**
+   * Add entries to <code>packagedClasses</code> corresponding to class files
+   * contained in <code>jar</code>.
+   * @param jar The jar who's content to list.
+   * @param packagedClasses map[class -> jar]
+   */
+  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
+    if (null == jar || jar.isEmpty()) {
+      return;
+    }
+    ZipFile zip = null;
+    try {
+      zip = new ZipFile(jar);
+      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+        ZipEntry entry = iter.nextElement();
+        if (entry.getName().endsWith("class")) {
+          packagedClasses.put(entry.getName(), jar);
+        }
+      }
+    } finally {
+      if (null != zip) zip.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java
new file mode 100644
index 0000000..8af750b
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+import java.io.IOException;
+
+/**
+ * Small committer class that does not do anything.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableOutputCommitter extends OutputCommitter {
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
new file mode 100644
index 0000000..e80b73f
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>
+ * Use {@link
+ * KuduTableMapReduceUtil.TableOutputFormatConfigurator}
+ * to correctly setup this output format, then {@link
+ * KuduTableMapReduceUtil#getTableFromContext(org.apache.hadoop.mapreduce.TaskInputOutputContext)}
+ * to get a KuduTable.
+ * </p>
+ *
+ * <p>
+ * Hadoop doesn't have the concept of "closing" the output format so in order to release the
+ * resources we assume that once either
+ * {@link #checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link TableRecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)}
+ * have been called that the object won't be used again and the KuduClient is shut down.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableOutputFormat extends OutputFormat<NullWritable,Operation>
+    implements Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduTableOutputFormat.class);
+
+  /** Job parameter that specifies the output table. */
+  static final String OUTPUT_TABLE_KEY = "kudu.mapreduce.output.table";
+
+  /** Job parameter that specifies where the masters are */
+  static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.addresses";
+
+  /** Job parameter that specifies how long we wait for operations to complete */
+  static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
+
+  /** Number of rows that are buffered before flushing to the tablet server */
+  static final String BUFFER_ROW_COUNT_KEY = "kudu.mapreduce.buffer.row.count";
+
+  /**
+   * Job parameter that specifies which key is to be used to reach the KuduTableOutputFormat
+   * belonging to the caller
+   */
+  static final String MULTITON_KEY = "kudu.mapreduce.multitonkey";
+
+  /**
+   * This multiton is used so that the tasks using this output format/record writer can find
+   * their KuduTable without having a direct dependency on this class,
+   * with the additional complexity that the output format cannot be shared between threads.
+   */
+  private static final ConcurrentHashMap<String, KuduTableOutputFormat> MULTITON = new
+      ConcurrentHashMap<String, KuduTableOutputFormat>();
+
+  /**
+   * This counter helps indicate which task log to look at since rows that weren't applied will
+   * increment this counter.
+   */
+  public enum Counters { ROWS_WITH_ERRORS }
+
+  private Configuration conf = null;
+
+  private KuduClient client;
+  private KuduTable table;
+  private KuduSession session;
+  private long operationTimeoutMs;
+
+  @Override
+  public void setConf(Configuration entries) {
+    this.conf = new Configuration(entries);
+
+    String masterAddress = this.conf.get(MASTER_ADDRESSES_KEY);
+    String tableName = this.conf.get(OUTPUT_TABLE_KEY);
+    this.operationTimeoutMs = this.conf.getLong(OPERATION_TIMEOUT_MS_KEY,
+        AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+    int bufferSpace = this.conf.getInt(BUFFER_ROW_COUNT_KEY, 1000);
+
+    this.client = new KuduClient.KuduClientBuilder(masterAddress)
+        .defaultOperationTimeoutMs(operationTimeoutMs)
+        .build();
+    try {
+      this.table = client.openTable(tableName);
+    } catch (Exception ex) {
+      throw new RuntimeException("Could not obtain the table from the master, " +
+          "is the master running and is this table created? tablename=" + tableName + " and " +
+          "master address= " + masterAddress, ex);
+    }
+    this.session = client.newSession();
+    this.session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+    this.session.setMutationBufferSpace(bufferSpace);
+    this.session.setIgnoreAllDuplicateRows(true);
+    String multitonKey = String.valueOf(Thread.currentThread().getId());
+    assert(MULTITON.get(multitonKey) == null);
+    MULTITON.put(multitonKey, this);
+    entries.set(MULTITON_KEY, multitonKey);
+  }
+
+  private void shutdownClient() throws IOException {
+    try {
+      client.shutdown();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static KuduTable getKuduTable(String multitonKey) {
+    return MULTITON.get(multitonKey).getKuduTable();
+  }
+
+  private KuduTable getKuduTable() {
+    return this.table;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public RecordWriter<NullWritable, Operation> getRecordWriter(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return new TableRecordWriter(this.session);
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+    shutdownClient();
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws
+      IOException, InterruptedException {
+    return new KuduTableOutputCommitter();
+  }
+
+  protected class TableRecordWriter extends RecordWriter<NullWritable, Operation> {
+
+    private final AtomicLong rowsWithErrors = new AtomicLong();
+    private final KuduSession session;
+
+    public TableRecordWriter(KuduSession session) {
+      this.session = session;
+    }
+
+    @Override
+    public void write(NullWritable key, Operation operation)
+        throws IOException, InterruptedException {
+      try {
+        session.apply(operation);
+      } catch (Exception e) {
+        throw new IOException("Encountered an error while writing", e);
+      }
+    }
+
+    @Override
+    public void close(TaskAttemptContext taskAttemptContext) throws IOException,
+        InterruptedException {
+      try {
+        processRowErrors(session.close());
+        shutdownClient();
+      } catch (Exception e) {
+        throw new IOException("Encountered an error while closing this task", e);
+      } finally {
+        if (taskAttemptContext != null) {
+          // This is the only place where we have access to the context in the record writer,
+          // so set the counter here.
+          taskAttemptContext.getCounter(Counters.ROWS_WITH_ERRORS).setValue(rowsWithErrors.get());
+        }
+      }
+    }
+
+    private void processRowErrors(List<OperationResponse> responses) {
+      List<RowError> errors = OperationResponse.collectErrors(responses);
+      if (!errors.isEmpty()) {
+        int rowErrorsCount = errors.size();
+        rowsWithErrors.addAndGet(rowErrorsCount);
+        LOG.warn("Got per errors for " + rowErrorsCount + " rows, " +
+            "the first one being " + errors.get(0).getStatus());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java
new file mode 100644
index 0000000..7cf3ada
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.Operation;
+import org.apache.hadoop.mapreduce.Reducer;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
+    extends Reducer<KEYIN, VALUEIN, KEYOUT, Operation> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java
deleted file mode 100644
index 05c18f2..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java
+++ /dev/null
@@ -1,144 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.apache.hadoop.conf.Configuration;
-import org.kududb.client.KuduClient;
-
-/**
- * Utility class that manages common configurations to all MR jobs. For example,
- * any job that uses {#KuduTableMapReduceUtil} to setup an input or output format
- * and that has parsed the command line arguments with
- * {@link org.apache.hadoop.util.GenericOptionsParser} can simply be passed:
- * <code>
- * -Dmaster.address=ADDRESS
- * </code>
- * in order to specify where the master is.
- * Use {@link CommandLineParser#getHelpSnippet()} to provide usage text for the configurations
- * managed by this class.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class CommandLineParser {
-  private final Configuration conf;
-  public static final String MASTER_ADDRESSES_KEY = "kudu.master.addresses";
-  public static final String MASTER_ADDRESSES_DEFAULT = "127.0.0.1";
-  public static final String OPERATION_TIMEOUT_MS_KEY = "kudu.operation.timeout.ms";
-  public static final long OPERATION_TIMEOUT_MS_DEFAULT =
-      AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-  public static final String ADMIN_OPERATION_TIMEOUT_MS_KEY = "kudu.admin.operation.timeout.ms";
-  public static final String SOCKET_READ_TIMEOUT_MS_KEY = "kudu.socket.read.timeout.ms";
-  public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT =
-      AsyncKuduClient.DEFAULT_SOCKET_READ_TIMEOUT_MS;
-  public static final String NUM_REPLICAS_KEY = "kudu.num.replicas";
-  public static final int NUM_REPLICAS_DEFAULT = 3;
-
-  /**
-   * Constructor that uses a Configuration that has already been through
-   * {@link org.apache.hadoop.util.GenericOptionsParser}'s command line parsing.
-   * @param conf the configuration from which job configurations will be extracted
-   */
-  public CommandLineParser(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get the configured master's config.
-   * @return a string that contains the passed config, or the default value
-   */
-  public String getMasterAddresses() {
-    return conf.get(MASTER_ADDRESSES_KEY, MASTER_ADDRESSES_DEFAULT);
-  }
-
-  /**
-   * Get the configured timeout for operations on sessions and scanners.
-   * @return a long that represents the passed timeout, or the default value
-   */
-  public long getOperationTimeoutMs() {
-    return conf.getLong(OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
-  }
-
-  /**
-   * Get the configured timeout for admin operations.
-   * @return a long that represents the passed timeout, or the default value
-   */
-  public long getAdminOperationTimeoutMs() {
-    return conf.getLong(ADMIN_OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
-  }
-
-  /**
-   * Get the configured timeout for socket reads.
-   * @return a long that represents the passed timeout, or the default value
-   */
-  public long getSocketReadTimeoutMs() {
-    return conf.getLong(SOCKET_READ_TIMEOUT_MS_KEY, SOCKET_READ_TIMEOUT_MS_DEFAULT);
-  }
-
-  /**
-   * Get the number of replicas to use when configuring a new table.
-   * @return an int that represents the passed number of replicas to use, or the default value.
-   */
-  public int getNumReplicas() {
-    return conf.getInt(NUM_REPLICAS_KEY, NUM_REPLICAS_DEFAULT);
-  }
-
-  /**
-   * Get an async client connected to the configured Master(s).
-   * @return an async kudu client
-   */
-  public AsyncKuduClient getAsyncClient() {
-    return new AsyncKuduClient.AsyncKuduClientBuilder(getMasterAddresses())
-        .defaultOperationTimeoutMs(getOperationTimeoutMs())
-        .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
-        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
-        .build();
-  }
-
-  /**
-   * Get a client connected to the configured Master(s).
-   * @return a kudu client
-   */
-  public KuduClient getClient() {
-    return new KuduClient.KuduClientBuilder(getMasterAddresses())
-        .defaultOperationTimeoutMs(getOperationTimeoutMs())
-        .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
-        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
-        .build();
-  }
-
-  /**
-   * This method returns a single multi-line string that contains the help snippet to append to
-   * the tail of a usage() or help() type of method.
-   * @return a string with all the available configurations and their defaults
-   */
-  public static String getHelpSnippet() {
-    return "\nAdditionally, the following options are available:" +
-      "  -D" + OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for read and write " +
-          "operations, defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
-      "  -D" + ADMIN_OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for admin operations " +
-        ", defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
-      "  -D" + SOCKET_READ_TIMEOUT_MS_KEY + "=TIME - timeout for socket reads " +
-        ", defaults to " + SOCKET_READ_TIMEOUT_MS_DEFAULT + " \n"+
-      "  -D" + MASTER_ADDRESSES_KEY + "=ADDRESSES - addresses to reach the Masters, " +
-        "defaults to " + MASTER_ADDRESSES_DEFAULT + " which is usually wrong.\n" +
-      "  -D " + NUM_REPLICAS_KEY + "=NUM - number of replicas to use when configuring a new " +
-        "table, defaults to " + NUM_REPLICAS_DEFAULT;
-  }
-}