You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:09 UTC
[04/36] incubator-kudu git commit: [java-client] repackage to
org.apache.kudu (Part 1)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
new file mode 100644
index 0000000..d732b71
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.CreateTableOptions;
+import org.kududb.client.KuduTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class KuduSinkTest extends BaseKuduTest {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
+
+ private KuduTable createNewTable(String tableName) throws Exception {
+ LOG.info("Creating new table...");
+
+ ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+ .setNumReplicas(1);
+ KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+
+ LOG.info("Created new table.");
+
+ return table;
+ }
+
+ @Test
+ public void testMandatoryParameters() {
+ LOG.info("Testing mandatory parameters...");
+
+ KuduSink sink = new KuduSink(syncClient);
+
+ HashMap<String, String> parameters = new HashMap<>();
+ Context context = new Context(parameters);
+ try {
+ Configurables.configure(sink, context);
+ Assert.fail("Should have failed due to missing properties");
+ } catch (NullPointerException npe) {
+ //good
+ }
+
+ parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
+ context = new Context(parameters);
+ try {
+ Configurables.configure(sink, context);
+ Assert.fail("Should have failed due to missing properties");
+ } catch (NullPointerException npe) {
+ //good
+ }
+
+ LOG.info("Testing mandatory parameters finished successfully.");
+ }
+
+ @Test(expected = FlumeException.class)
+ public void testMissingTable() throws Exception {
+ LOG.info("Testing missing table...");
+
+ KuduSink sink = createSink("missingTable");
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+
+ LOG.info("Testing missing table finished successfully.");
+ }
+
+ @Test
+ public void testEmptyChannelWithDefaults() throws Exception {
+ testEventsWithDefaults(0);
+ }
+
+ @Test
+ public void testOneEventWithDefaults() throws Exception {
+ testEventsWithDefaults(1);
+ }
+
+ @Test
+ public void testThreeEventsWithDefaults() throws Exception {
+ testEventsWithDefaults(3);
+ }
+
+ @Test
+ public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
+ doTestDuplicateRows(true);
+ }
+
+ @Test
+ public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
+ doTestDuplicateRows(false);
+ }
+
+ private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
+ KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
+ String tableName = table.getName();
+ Context sinkContext = new Context();
+ sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+ Boolean.toString(ignoreDuplicateRows));
+ KuduSink sink = createSink(tableName, sinkContext);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+
+ for (int i = 0; i < 2; i++) {
+ Event e = EventBuilder.withBody("key-0", Charsets.UTF_8); // Duplicate keys.
+ channel.put(e);
+ }
+
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = sink.process();
+ if (!ignoreDuplicateRows) {
+ fail("Incorrectly ignored duplicate rows!");
+ }
+ assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
+ } catch (EventDeliveryException e) {
+ if (ignoreDuplicateRows) {
+ throw new AssertionError("Failed to ignore duplicate rows!", e);
+ } else {
+ LOG.info("Correctly did not ignore duplicate rows", e);
+ return;
+ }
+ }
+
+ // We only get here if the process() succeeded.
+ try {
+ List<String> rows = scanTableToStrings(table);
+ assertEquals("1 row expected", 1, rows.size());
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+
+ LOG.info("Testing duplicate events finished successfully.");
+ }
+
+ private void testEventsWithDefaults(int eventCount) throws Exception {
+ LOG.info("Testing {} events...", eventCount);
+
+ KuduTable table = createNewTable("test" + eventCount + "events");
+ String tableName = table.getName();
+ KuduSink sink = createSink(tableName);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+
+ for (int i = 0; i < eventCount; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
+ channel.put(e);
+ }
+
+ tx.commit();
+ tx.close();
+
+ Sink.Status status = sink.process();
+ if (eventCount == 0) {
+ assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
+ } else {
+ assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+ }
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+ for (int i = 0; i < eventCount; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing {} events finished successfully.", eventCount);
+ }
+
+ private KuduSink createSink(String tableName) {
+ return createSink(tableName, new Context());
+ }
+
+ private KuduSink createSink(String tableName, Context ctx) {
+ LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+ KuduSink sink = new KuduSink(syncClient);
+ HashMap<String, String> parameters = new HashMap<>();
+ parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+ parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
+ Context context = new Context(parameters);
+ context.putAll(ctx.getParameters());
+ Configurables.configure(sink, context);
+
+ LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+ return sink;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
deleted file mode 100644
index d732b71..0000000
--- a/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kududb.flume.sink;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Assert;
-import org.junit.Test;
-import org.kududb.ColumnSchema;
-import org.kududb.Schema;
-import org.kududb.Type;
-import org.kududb.client.BaseKuduTest;
-import org.kududb.client.CreateTableOptions;
-import org.kududb.client.KuduTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KuduSinkTest extends BaseKuduTest {
- private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
-
- private KuduTable createNewTable(String tableName) throws Exception {
- LOG.info("Creating new table...");
-
- ArrayList<ColumnSchema> columns = new ArrayList<>(1);
- columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
- .setNumReplicas(1);
- KuduTable table = createTable(tableName, new Schema(columns), createOptions);
-
- LOG.info("Created new table.");
-
- return table;
- }
-
- @Test
- public void testMandatoryParameters() {
- LOG.info("Testing mandatory parameters...");
-
- KuduSink sink = new KuduSink(syncClient);
-
- HashMap<String, String> parameters = new HashMap<>();
- Context context = new Context(parameters);
- try {
- Configurables.configure(sink, context);
- Assert.fail("Should have failed due to missing properties");
- } catch (NullPointerException npe) {
- //good
- }
-
- parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
- context = new Context(parameters);
- try {
- Configurables.configure(sink, context);
- Assert.fail("Should have failed due to missing properties");
- } catch (NullPointerException npe) {
- //good
- }
-
- LOG.info("Testing mandatory parameters finished successfully.");
- }
-
- @Test(expected = FlumeException.class)
- public void testMissingTable() throws Exception {
- LOG.info("Testing missing table...");
-
- KuduSink sink = createSink("missingTable");
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
-
- LOG.info("Testing missing table finished successfully.");
- }
-
- @Test
- public void testEmptyChannelWithDefaults() throws Exception {
- testEventsWithDefaults(0);
- }
-
- @Test
- public void testOneEventWithDefaults() throws Exception {
- testEventsWithDefaults(1);
- }
-
- @Test
- public void testThreeEventsWithDefaults() throws Exception {
- testEventsWithDefaults(3);
- }
-
- @Test
- public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
- doTestDuplicateRows(true);
- }
-
- @Test
- public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
- doTestDuplicateRows(false);
- }
-
- private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
- KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
- String tableName = table.getName();
- Context sinkContext = new Context();
- sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
- Boolean.toString(ignoreDuplicateRows));
- KuduSink sink = createSink(tableName, sinkContext);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
-
- Transaction tx = channel.getTransaction();
- tx.begin();
-
- for (int i = 0; i < 2; i++) {
- Event e = EventBuilder.withBody("key-0", Charsets.UTF_8); // Duplicate keys.
- channel.put(e);
- }
-
- tx.commit();
- tx.close();
-
- try {
- Sink.Status status = sink.process();
- if (!ignoreDuplicateRows) {
- fail("Incorrectly ignored duplicate rows!");
- }
- assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
- } catch (EventDeliveryException e) {
- if (ignoreDuplicateRows) {
- throw new AssertionError("Failed to ignore duplicate rows!", e);
- } else {
- LOG.info("Correctly did not ignore duplicate rows", e);
- return;
- }
- }
-
- // We only get here if the process() succeeded.
- try {
- List<String> rows = scanTableToStrings(table);
- assertEquals("1 row expected", 1, rows.size());
- } catch (Exception e) {
- Throwables.propagate(e);
- }
-
- LOG.info("Testing duplicate events finished successfully.");
- }
-
- private void testEventsWithDefaults(int eventCount) throws Exception {
- LOG.info("Testing {} events...", eventCount);
-
- KuduTable table = createNewTable("test" + eventCount + "events");
- String tableName = table.getName();
- KuduSink sink = createSink(tableName);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
-
- Transaction tx = channel.getTransaction();
- tx.begin();
-
- for (int i = 0; i < eventCount; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
- channel.put(e);
- }
-
- tx.commit();
- tx.close();
-
- Sink.Status status = sink.process();
- if (eventCount == 0) {
- assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
- } else {
- assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
- }
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
- for (int i = 0; i < eventCount; i++) {
- assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
- }
-
- LOG.info("Testing {} events finished successfully.", eventCount);
- }
-
- private KuduSink createSink(String tableName) {
- return createSink(tableName, new Context());
- }
-
- private KuduSink createSink(String tableName, Context ctx) {
- LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
- KuduSink sink = new KuduSink(syncClient);
- HashMap<String, String> parameters = new HashMap<>();
- parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
- parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
- Context context = new Context(parameters);
- context.putAll(ctx.getParameters());
- Configurables.configure(sink, context);
-
- LOG.info("Created Kudu sink for '{}' table.", tableName);
-
- return sink;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
new file mode 100644
index 0000000..05c18f2
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.apache.hadoop.conf.Configuration;
+import org.kududb.client.KuduClient;
+
+/**
+ * Utility class that manages common configurations to all MR jobs. For example,
+ * any job that uses {#KuduTableMapReduceUtil} to setup an input or output format
+ * and that has parsed the command line arguments with
+ * {@link org.apache.hadoop.util.GenericOptionsParser} can simply be passed:
+ * <code>
+ * -Dmaster.address=ADDRESS
+ * </code>
+ * in order to specify where the master is.
+ * Use {@link CommandLineParser#getHelpSnippet()} to provide usage text for the configurations
+ * managed by this class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class CommandLineParser {
+ private final Configuration conf;
+ public static final String MASTER_ADDRESSES_KEY = "kudu.master.addresses";
+ public static final String MASTER_ADDRESSES_DEFAULT = "127.0.0.1";
+ public static final String OPERATION_TIMEOUT_MS_KEY = "kudu.operation.timeout.ms";
+ public static final long OPERATION_TIMEOUT_MS_DEFAULT =
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+ public static final String ADMIN_OPERATION_TIMEOUT_MS_KEY = "kudu.admin.operation.timeout.ms";
+ public static final String SOCKET_READ_TIMEOUT_MS_KEY = "kudu.socket.read.timeout.ms";
+ public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT =
+ AsyncKuduClient.DEFAULT_SOCKET_READ_TIMEOUT_MS;
+ public static final String NUM_REPLICAS_KEY = "kudu.num.replicas";
+ public static final int NUM_REPLICAS_DEFAULT = 3;
+
+ /**
+ * Constructor that uses a Configuration that has already been through
+ * {@link org.apache.hadoop.util.GenericOptionsParser}'s command line parsing.
+ * @param conf the configuration from which job configurations will be extracted
+ */
+ public CommandLineParser(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get the configured master's config.
+ * @return a string that contains the passed config, or the default value
+ */
+ public String getMasterAddresses() {
+ return conf.get(MASTER_ADDRESSES_KEY, MASTER_ADDRESSES_DEFAULT);
+ }
+
+ /**
+ * Get the configured timeout for operations on sessions and scanners.
+ * @return a long that represents the passed timeout, or the default value
+ */
+ public long getOperationTimeoutMs() {
+ return conf.getLong(OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
+ }
+
+ /**
+ * Get the configured timeout for admin operations.
+ * @return a long that represents the passed timeout, or the default value
+ */
+ public long getAdminOperationTimeoutMs() {
+ return conf.getLong(ADMIN_OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
+ }
+
+ /**
+ * Get the configured timeout for socket reads.
+ * @return a long that represents the passed timeout, or the default value
+ */
+ public long getSocketReadTimeoutMs() {
+ return conf.getLong(SOCKET_READ_TIMEOUT_MS_KEY, SOCKET_READ_TIMEOUT_MS_DEFAULT);
+ }
+
+ /**
+ * Get the number of replicas to use when configuring a new table.
+ * @return an int that represents the passed number of replicas to use, or the default value.
+ */
+ public int getNumReplicas() {
+ return conf.getInt(NUM_REPLICAS_KEY, NUM_REPLICAS_DEFAULT);
+ }
+
+ /**
+ * Get an async client connected to the configured Master(s).
+ * @return an async kudu client
+ */
+ public AsyncKuduClient getAsyncClient() {
+ return new AsyncKuduClient.AsyncKuduClientBuilder(getMasterAddresses())
+ .defaultOperationTimeoutMs(getOperationTimeoutMs())
+ .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
+ .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
+ .build();
+ }
+
+ /**
+ * Get a client connected to the configured Master(s).
+ * @return a kudu client
+ */
+ public KuduClient getClient() {
+ return new KuduClient.KuduClientBuilder(getMasterAddresses())
+ .defaultOperationTimeoutMs(getOperationTimeoutMs())
+ .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
+ .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
+ .build();
+ }
+
+ /**
+ * This method returns a single multi-line string that contains the help snippet to append to
+ * the tail of a usage() or help() type of method.
+ * @return a string with all the available configurations and their defaults
+ */
+ public static String getHelpSnippet() {
+ return "\nAdditionally, the following options are available:" +
+ " -D" + OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for read and write " +
+ "operations, defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
+ " -D" + ADMIN_OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for admin operations " +
+ ", defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
+ " -D" + SOCKET_READ_TIMEOUT_MS_KEY + "=TIME - timeout for socket reads " +
+ ", defaults to " + SOCKET_READ_TIMEOUT_MS_DEFAULT + " \n"+
+ " -D" + MASTER_ADDRESSES_KEY + "=ADDRESSES - addresses to reach the Masters, " +
+ "defaults to " + MASTER_ADDRESSES_DEFAULT + " which is usually wrong.\n" +
+ " -D " + NUM_REPLICAS_KEY + "=NUM - number of replicas to use when configuring a new " +
+ "table, defaults to " + NUM_REPLICAS_DEFAULT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java
new file mode 100644
index 0000000..57593db
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/JarFinder.java
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import com.google.common.base.Preconditions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * Finds the Jar for a class. If the class is in a directory in the
+ * classpath, it creates a Jar on the fly with the contents of the directory
+ * and returns the path to that Jar. If a Jar is created, it is created in
+ * the system temporary directory.
+ *
+ * This file was forked from hbase/branches/master@4ce6f48.
+ */
+public class JarFinder {
+
+ private static void copyToZipStream(File file, ZipEntry entry,
+ ZipOutputStream zos) throws IOException {
+ InputStream is = new FileInputStream(file);
+ try {
+ zos.putNextEntry(entry);
+ byte[] arr = new byte[4096];
+ int read = is.read(arr);
+ while (read > -1) {
+ zos.write(arr, 0, read);
+ read = is.read(arr);
+ }
+ } finally {
+ try {
+ is.close();
+ } finally {
+ zos.closeEntry();
+ }
+ }
+ }
+
+ public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
+ throws IOException {
+ Preconditions.checkNotNull(relativePath, "relativePath");
+ Preconditions.checkNotNull(zos, "zos");
+
+ // by JAR spec, if there is a manifest, it must be the first entry in the
+ // ZIP.
+ File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+ ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
+ if (!manifestFile.exists()) {
+ zos.putNextEntry(manifestEntry);
+ new Manifest().write(new BufferedOutputStream(zos));
+ zos.closeEntry();
+ } else {
+ copyToZipStream(manifestFile, manifestEntry, zos);
+ }
+ zos.closeEntry();
+ zipDir(dir, relativePath, zos, true);
+ zos.close();
+ }
+
+ private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
+ boolean start) throws IOException {
+ String[] dirList = dir.list();
+ for (String aDirList : dirList) {
+ File f = new File(dir, aDirList);
+ if (!f.isHidden()) {
+ if (f.isDirectory()) {
+ if (!start) {
+ ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
+ zos.putNextEntry(dirEntry);
+ zos.closeEntry();
+ }
+ String filePath = f.getPath();
+ File file = new File(filePath);
+ zipDir(file, relativePath + f.getName() + "/", zos, false);
+ }
+ else {
+ String path = relativePath + f.getName();
+ if (!path.equals(JarFile.MANIFEST_NAME)) {
+ ZipEntry anEntry = new ZipEntry(path);
+ copyToZipStream(f, anEntry, zos);
+ }
+ }
+ }
+ }
+ }
+
+ private static void createJar(File dir, File jarFile) throws IOException {
+ Preconditions.checkNotNull(dir, "dir");
+ Preconditions.checkNotNull(jarFile, "jarFile");
+ File jarDir = jarFile.getParentFile();
+ if (!jarDir.exists()) {
+ if (!jarDir.mkdirs()) {
+ throw new IOException(MessageFormat.format("could not create dir [{0}]",
+ jarDir));
+ }
+ }
+ JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile));
+ jarDir(dir, "", zos);
+ }
+
+ /**
+ * Returns the full path to the Jar containing the class. It always returns a
+ * JAR.
+ *
+ * @param klass class.
+ *
+ * @return path to the Jar containing the class.
+ */
+ public static String getJar(Class klass) {
+ Preconditions.checkNotNull(klass, "klass");
+ ClassLoader loader = klass.getClassLoader();
+ if (loader != null) {
+ String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for (Enumeration itr = loader.getResources(class_file);
+ itr.hasMoreElements(); ) {
+ URL url = (URL) itr.nextElement();
+ String path = url.getPath();
+ if (path.startsWith("file:")) {
+ path = path.substring("file:".length());
+ }
+ path = URLDecoder.decode(path, "UTF-8");
+ if ("jar".equals(url.getProtocol())) {
+ path = URLDecoder.decode(path, "UTF-8");
+ return path.replaceAll("!.*$", "");
+ }
+ else if ("file".equals(url.getProtocol())) {
+ String klassName = klass.getName();
+ klassName = klassName.replace(".", "/") + ".class";
+ path = path.substring(0, path.length() - klassName.length());
+ File baseDir = new File(path);
+ File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
+ testDir = testDir.getAbsoluteFile();
+ if (!testDir.exists()) {
+ testDir.mkdirs();
+ }
+ File tempJar = File.createTempFile("hadoop-", "", testDir);
+ tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+ tempJar.deleteOnExit();
+ createJar(baseDir, tempJar);
+ return tempJar.getAbsolutePath();
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
new file mode 100644
index 0000000..25235cb
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -0,0 +1,444 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.kududb.mapreduce;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.naming.NamingException;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
+import org.kududb.Common;
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.Bytes;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.KuduTable;
+import org.kududb.client.LocatedTablet;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.KuduScanToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This input format generates one split per tablet and the only location for each split is that
+ * tablet's leader.
+ * </p>
+ *
+ * <p>
+ * Hadoop doesn't have the concept of "closing" the input format so in order to release the
+ * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that
+ * the object won't be used again and the AsyncKuduClient is shut down.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
+ implements Configurable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
+
+ /** Job parameter that specifies the input table. */
+ static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
+
+ /** Job parameter that specifies if the scanner should cache blocks or not (default: false). */
+ static final String SCAN_CACHE_BLOCKS = "kudu.mapreduce.input.scan.cache.blocks";
+
+ /** Job parameter that specifies where the masters are. */
+ static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.address";
+
+ /** Job parameter that specifies how long we wait for operations to complete (default: 10s). */
+ static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
+
+ /** Job parameter that specifies the address for the name server. */
+ static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
+
+ /** Job parameter that specifies the encoded column predicates (may be empty). */
+ static final String ENCODED_PREDICATES_KEY =
+ "kudu.mapreduce.encoded.predicates";
+
+ /**
+ * Job parameter that specifies the column projection as a comma-separated list of column names.
+ *
+ * Not specifying this at all (i.e. setting to null) or setting to the special string
+ * '*' means to project all columns.
+ *
+ * Specifying the empty string means to project no columns (i.e just count the rows).
+ */
+ static final String COLUMN_PROJECTION_KEY = "kudu.mapreduce.column.projection";
+
+ /**
+ * The reverse DNS lookup cache mapping: address from Kudu => hostname for Hadoop. This cache is
+ * used in order to not do DNS lookups multiple times for each tablet server.
+ */
+ private final Map<String, String> reverseDNSCacheMap = new HashMap<>();
+
+ private Configuration conf;
+ private KuduClient client;
+ private KuduTable table;
+ private long operationTimeoutMs;
+ private String nameServer;
+ private boolean cacheBlocks;
+ private List<String> projectedCols;
+ private List<KuduPredicate> predicates;
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ try {
+ if (table == null) {
+ throw new IOException("No table was provided");
+ }
+
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
+ .setProjectedColumnNames(projectedCols)
+ .cacheBlocks(cacheBlocks)
+ .setTimeout(operationTimeoutMs);
+ for (KuduPredicate predicate : predicates) {
+ tokenBuilder.addPredicate(predicate);
+ }
+ List<KuduScanToken> tokens = tokenBuilder.build();
+
+ List<InputSplit> splits = new ArrayList<>(tokens.size());
+ for (KuduScanToken token : tokens) {
+ List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+ for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+ locations.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
+ }
+ splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
+ }
+ return splits;
+ } finally {
+ shutdownClient();
+ }
+ }
+
+ private void shutdownClient() throws IOException {
+ try {
+ client.shutdown();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * This method might seem alien, but we do this in order to resolve the hostnames the same way
+ * Hadoop does. This ensures we get locality if Kudu is running along MR/YARN.
+ * @param host hostname we got from the master
+ * @param port port we got from the master
+ * @return reverse DNS'd address
+ */
+ private String reverseDNS(String host, Integer port) {
+ String location = this.reverseDNSCacheMap.get(host);
+ if (location != null) {
+ return location;
+ }
+ // The below InetSocketAddress creation does a name resolution.
+ InetSocketAddress isa = new InetSocketAddress(host, port);
+ if (isa.isUnresolved()) {
+ LOG.warn("Failed address resolve for: " + isa);
+ }
+ InetAddress tabletInetAddress = isa.getAddress();
+ try {
+ location = domainNamePointerToHostName(
+ DNS.reverseDns(tabletInetAddress, this.nameServer));
+ this.reverseDNSCacheMap.put(host, location);
+ } catch (NamingException e) {
+ LOG.warn("Cannot resolve the host name for " + tabletInetAddress + " because of " + e);
+ location = host;
+ }
+ return location;
+ }
+
+ @Override
+ public RecordReader<NullWritable, RowResult> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return new TableRecordReader();
+ }
+
+ @Override
+ public void setConf(Configuration entries) {
+ this.conf = new Configuration(entries);
+
+ String tableName = conf.get(INPUT_TABLE_KEY);
+ String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
+ this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+ this.nameServer = conf.get(NAME_SERVER_KEY);
+ this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
+
+ this.client = new KuduClient.KuduClientBuilder(masterAddresses)
+ .defaultOperationTimeoutMs(operationTimeoutMs)
+ .build();
+ try {
+ this.table = client.openTable(tableName);
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not obtain the table from the master, " +
+ "is the master running and is this table created? tablename=" + tableName + " and " +
+ "master address= " + masterAddresses, ex);
+ }
+
+ String projectionConfig = conf.get(COLUMN_PROJECTION_KEY);
+ if (projectionConfig == null || projectionConfig.equals("*")) {
+ this.projectedCols = null; // project the whole table
+ } else if ("".equals(projectionConfig)) {
+ this.projectedCols = new ArrayList<>();
+ } else {
+ this.projectedCols = Lists.newArrayList(Splitter.on(',').split(projectionConfig));
+
+ // Verify that the column names are valid -- better to fail with an exception
+ // before we submit the job.
+ Schema tableSchema = table.getSchema();
+ for (String columnName : projectedCols) {
+ if (tableSchema.getColumn(columnName) == null) {
+ throw new IllegalArgumentException("Unknown column " + columnName);
+ }
+ }
+ }
+
+ this.predicates = new ArrayList<>();
+ try {
+ InputStream is =
+ new ByteArrayInputStream(Base64.decodeBase64(conf.get(ENCODED_PREDICATES_KEY, "")));
+ while (is.available() > 0) {
+ this.predicates.add(KuduPredicate.fromPB(table.getSchema(),
+ Common.ColumnPredicatePB.parseDelimitedFrom(is)));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("unable to deserialize predicates from the configuration", e);
+ }
+ }
+
+ /**
+ * Given a PTR string generated via reverse DNS lookup, return everything
+ * except the trailing period. Example for host.example.com., return
+ * host.example.com
+ * @param dnPtr a domain name pointer (PTR) string.
+ * @return Sanitized hostname with last period stripped off.
+ *
+ */
+ private static String domainNamePointerToHostName(String dnPtr) {
+ if (dnPtr == null)
+ return null;
+ return dnPtr.endsWith(".") ? dnPtr.substring(0, dnPtr.length() - 1) : dnPtr;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
+
+ /** The scan token that the split will use to scan the Kudu table. */
+ private byte[] scanToken;
+
+ /** The start partition key of the scan. Used for sorting splits. */
+ private byte[] partitionKey;
+
+ /** Tablet server locations which host the tablet to be scanned. */
+ private String[] locations;
+
+ public TableSplit() { } // Writable
+
+ public TableSplit(KuduScanToken token, String[] locations) throws IOException {
+ this.scanToken = token.serialize();
+ this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
+ this.locations = locations;
+ }
+
+ public byte[] getScanToken() {
+ return scanToken;
+ }
+
+ public byte[] getPartitionKey() {
+ return partitionKey;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ // TODO Guesstimate a size
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return locations;
+ }
+
+ @Override
+ public int compareTo(TableSplit other) {
+ return UnsignedBytes.lexicographicalComparator().compare(partitionKey, other.partitionKey);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ Bytes.writeByteArray(dataOutput, scanToken);
+ Bytes.writeByteArray(dataOutput, partitionKey);
+ dataOutput.writeInt(locations.length);
+ for (String location : locations) {
+ byte[] str = Bytes.fromString(location);
+ Bytes.writeByteArray(dataOutput, str);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ scanToken = Bytes.readByteArray(dataInput);
+ partitionKey = Bytes.readByteArray(dataInput);
+ locations = new String[dataInput.readInt()];
+ for (int i = 0; i < locations.length; i++) {
+ byte[] str = Bytes.readByteArray(dataInput);
+ locations[i] = Bytes.getString(str);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ // We currently just care about the partition key since we're within the same table.
+ return Arrays.hashCode(partitionKey);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableSplit that = (TableSplit) o;
+
+ return this.compareTo(that) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("partitionKey", Bytes.pretty(partitionKey))
+ .add("locations", Arrays.toString(locations))
+ .toString();
+ }
+ }
+
+ class TableRecordReader extends RecordReader<NullWritable, RowResult> {
+
+ private final NullWritable currentKey = NullWritable.get();
+ private RowResult currentValue;
+ private RowResultIterator iterator;
+ private KuduScanner scanner;
+ private TableSplit split;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (!(inputSplit instanceof TableSplit)) {
+ throw new IllegalArgumentException("TableSplit is the only accepted input split");
+ }
+
+ split = (TableSplit) inputSplit;
+
+ try {
+ scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ // Calling this now to set iterator.
+ tryRefreshIterator();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!iterator.hasNext()) {
+ tryRefreshIterator();
+ if (!iterator.hasNext()) {
+ // Means we still have the same iterator, we're done
+ return false;
+ }
+ }
+ currentValue = iterator.next();
+ return true;
+ }
+
+ /**
+ * If the scanner has more rows, get a new iterator else don't do anything.
+ * @throws IOException
+ */
+ private void tryRefreshIterator() throws IOException {
+ if (!scanner.hasMoreRows()) {
+ return;
+ }
+ try {
+ iterator = scanner.nextRows();
+ } catch (Exception e) {
+ throw new IOException("Couldn't get scan data", e);
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public RowResult getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ // TODO Guesstimate progress
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ scanner.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ shutdownClient();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
new file mode 100644
index 0000000..0b919d9
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
@@ -0,0 +1,541 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.kududb.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.StringUtils;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.ColumnRangePredicate;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+
+/**
+ * Utility class to setup MR jobs that use Kudu as an input and/or output.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableMapReduceUtil {
+ // Mostly lifted from HBase's TableMapReduceUtil
+
+ private static final Log LOG = LogFactory.getLog(KuduTableMapReduceUtil.class);
+
+ /**
+ * Doesn't need instantiation
+ */
+ private KuduTableMapReduceUtil() { }
+
+
+ /**
+ * Base class for MR I/O formats, contains the common configurations.
+ */
+ private static abstract class AbstractMapReduceConfigurator<S> {
+ protected final Job job;
+ protected final String table;
+
+ protected boolean addDependencies = true;
+
+ /**
+ * Constructor for the required fields to configure.
+ * @param job a job to configure
+ * @param table a string that contains the name of the table to read from
+ */
+ private AbstractMapReduceConfigurator(Job job, String table) {
+ this.job = job;
+ this.table = table;
+ }
+
+ /**
+ * Sets whether this job should add Kudu's dependencies to the distributed cache. Turned on
+ * by default.
+ * @param addDependencies a boolean that says if we should add the dependencies
+ * @return this instance
+ */
+ @SuppressWarnings("unchecked")
+ public S addDependencies(boolean addDependencies) {
+ this.addDependencies = addDependencies;
+ return (S) this;
+ }
+
+ /**
+ * Configures the job using the passed parameters.
+ * @throws IOException If addDependencies is enabled and a problem is encountered reading
+ * files on the filesystem
+ */
+ public abstract void configure() throws IOException;
+ }
+
+ /**
+ * Builder-like class that sets up the required configurations and classes to write to Kudu.
+ * <p>
+ * Use either child classes when configuring the table output format.
+ */
+ private static abstract class AbstractTableOutputFormatConfigurator
+ <S extends AbstractTableOutputFormatConfigurator<? super S>>
+ extends AbstractMapReduceConfigurator<S> {
+
+ protected String masterAddresses;
+ protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+
+ /**
+ * {@inheritDoc}
+ */
+ private AbstractTableOutputFormatConfigurator(Job job, String table) {
+ super(job, table);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void configure() throws IOException {
+ job.setOutputFormatClass(KuduTableOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Operation.class);
+
+ Configuration conf = job.getConfiguration();
+ conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
+ conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, table);
+ conf.setLong(KuduTableOutputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
+ if (addDependencies) {
+ addDependencyJars(job);
+ }
+ }
+ }
+
+ /**
+ * Builder-like class that sets up the required configurations and classes to read from Kudu.
+ * By default, block caching is disabled.
+ * <p>
+ * Use either child classes when configuring the table input format.
+ */
+ private static abstract class AbstractTableInputFormatConfigurator
+ <S extends AbstractTableInputFormatConfigurator<? super S>>
+ extends AbstractMapReduceConfigurator<S> {
+
+ protected String masterAddresses;
+ protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+ protected final String columnProjection;
+ protected boolean cacheBlocks;
+ protected List<KuduPredicate> predicates = new ArrayList<>();
+
+ /**
+ * Constructor for the required fields to configure.
+ * @param job a job to configure
+ * @param table a string that contains the name of the table to read from
+ * @param columnProjection a string containing a comma-separated list of columns to read.
+ * It can be null in which case we read empty rows
+ */
+ private AbstractTableInputFormatConfigurator(Job job, String table, String columnProjection) {
+ super(job, table);
+ this.columnProjection = columnProjection;
+ }
+
+ /**
+ * Sets the block caching configuration for the scanners. Turned off by default.
+ * @param cacheBlocks whether the job should use scanners that cache blocks.
+ * @return this instance
+ */
+ public S cacheBlocks(boolean cacheBlocks) {
+ this.cacheBlocks = cacheBlocks;
+ return (S) this;
+ }
+
+ /**
+ * Configures the job with all the passed parameters.
+ * @throws IOException If addDependencies is enabled and a problem is encountered reading
+ * files on the filesystem
+ */
+ public void configure() throws IOException {
+ job.setInputFormatClass(KuduTableInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+
+ conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
+ conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, table);
+ conf.setLong(KuduTableInputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
+ conf.setBoolean(KuduTableInputFormat.SCAN_CACHE_BLOCKS, cacheBlocks);
+
+ if (columnProjection != null) {
+ conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
+ }
+
+ conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, base64EncodePredicates(predicates));
+
+ if (addDependencies) {
+ addDependencyJars(job);
+ }
+ }
+ }
+
+ /**
+ * Returns the provided predicates as a Base64 encoded string.
+ * @param predicates the predicates to encode
+ * @return the encoded predicates
+ */
+ static String base64EncodePredicates(List<KuduPredicate> predicates) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ for (KuduPredicate predicate : predicates) {
+ predicate.toPB().writeDelimitedTo(baos);
+ }
+ return Base64.encodeBase64String(baos.toByteArray());
+ }
+
+
+ /**
+ * Table output format configurator to use to specify the parameters directly.
+ */
+ public static class TableOutputFormatConfigurator
+ extends AbstractTableOutputFormatConfigurator<TableOutputFormatConfigurator> {
+
+ /**
+ * Constructor for the required fields to configure.
+ * @param job a job to configure
+ * @param table a string that contains the name of the table to read from
+ * @param masterAddresses a comma-separated list of masters' hosts and ports
+ */
+ public TableOutputFormatConfigurator(Job job, String table, String masterAddresses) {
+ super(job, table);
+ this.masterAddresses = masterAddresses;
+ }
+
+ /**
+ * Sets the timeout for all the operations. The default is 10 seconds.
+ * @param operationTimeoutMs a long that represents the timeout for operations to complete,
+ * must be a positive value or 0
+ * @return this instance
+ * @throws IllegalArgumentException if the operation timeout is lower than 0
+ */
+ public TableOutputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
+ if (operationTimeoutMs < 0) {
+ throw new IllegalArgumentException("The operation timeout must be => 0, " +
+ "passed value is: " + operationTimeoutMs);
+ }
+ this.operationTimeoutMs = operationTimeoutMs;
+ return this;
+ }
+ }
+
+ /**
+ * Table output format that uses a {@link CommandLineParser} in order to set the
+ * master config and the operation timeout.
+ */
+ public static class TableOutputFormatConfiguratorWithCommandLineParser extends
+ AbstractTableOutputFormatConfigurator<TableOutputFormatConfiguratorWithCommandLineParser> {
+
+ /**
+ * {@inheritDoc}
+ */
+ public TableOutputFormatConfiguratorWithCommandLineParser(Job job, String table) {
+ super(job, table);
+ CommandLineParser parser = new CommandLineParser(job.getConfiguration());
+ this.masterAddresses = parser.getMasterAddresses();
+ this.operationTimeoutMs = parser.getOperationTimeoutMs();
+ }
+ }
+
+ /**
+ * Table input format configurator to use to specify the parameters directly.
+ */
+ public static class TableInputFormatConfigurator
+ extends AbstractTableInputFormatConfigurator<TableInputFormatConfigurator> {
+
+ /**
+ * Constructor for the required fields to configure.
+ * @param job a job to configure
+ * @param table a string that contains the name of the table to read from
+ * @param columnProjection a string containing a comma-separated list of columns to read.
+ * It can be null in which case we read empty rows
+ * @param masterAddresses a comma-separated list of masters' hosts and ports
+ */
+ public TableInputFormatConfigurator(Job job, String table, String columnProjection,
+ String masterAddresses) {
+ super(job, table, columnProjection);
+ this.masterAddresses = masterAddresses;
+ }
+
+ /**
+ * Sets the timeout for all the operations. The default is 10 seconds.
+ * @param operationTimeoutMs a long that represents the timeout for operations to complete,
+ * must be a positive value or 0
+ * @return this instance
+ * @throws IllegalArgumentException if the operation timeout is lower than 0
+ */
+ public TableInputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
+ if (operationTimeoutMs < 0) {
+ throw new IllegalArgumentException("The operation timeout must be => 0, " +
+ "passed value is: " + operationTimeoutMs);
+ }
+ this.operationTimeoutMs = operationTimeoutMs;
+ return this;
+ }
+
+ /**
+ * Adds a new predicate that will be pushed down to all the tablets.
+ * @param predicate a predicate to add
+ * @return this instance
+ * @deprecated use {@link #addPredicate}
+ */
+ @Deprecated
+ public TableInputFormatConfigurator addColumnRangePredicate(ColumnRangePredicate predicate) {
+ return addPredicate(predicate.toKuduPredicate());
+ }
+
+ /**
+ * Adds a new predicate that will be pushed down to all the tablets.
+ * @param predicate a predicate to add
+ * @return this instance
+ */
+ public TableInputFormatConfigurator addPredicate(KuduPredicate predicate) {
+ this.predicates.add(predicate);
+ return this;
+ }
+ }
+
+ /**
+ * Table input format that uses a {@link CommandLineParser} in order to set the
+ * master config and the operation timeout.
+ * This version cannot set column range predicates.
+ */
+ public static class TableInputFormatConfiguratorWithCommandLineParser extends
+ AbstractTableInputFormatConfigurator<TableInputFormatConfiguratorWithCommandLineParser> {
+
+ /**
+ * {@inheritDoc}
+ */
+ public TableInputFormatConfiguratorWithCommandLineParser(Job job,
+ String table,
+ String columnProjection) {
+ super(job, table, columnProjection);
+ CommandLineParser parser = new CommandLineParser(job.getConfiguration());
+ this.masterAddresses = parser.getMasterAddresses();
+ this.operationTimeoutMs = parser.getOperationTimeoutMs();
+ }
+ }
+
+ /**
+ * Use this method when setting up a task to get access to the KuduTable in order to create
+ * Inserts, Updates, and Deletes.
+ * @param context Map context
+ * @return The kudu table object as setup by the output format
+ */
+ @SuppressWarnings("rawtypes")
+ public static KuduTable getTableFromContext(TaskInputOutputContext context) {
+ String multitonKey = context.getConfiguration().get(KuduTableOutputFormat.MULTITON_KEY);
+ return KuduTableOutputFormat.getKuduTable(multitonKey);
+ }
+
+ /**
+ * Add the Kudu dependency jars as well as jars for any of the configured
+ * job classes to the job configuration, so that JobClient will ship them
+ * to the cluster and add them to the DistributedCache.
+ */
+ public static void addDependencyJars(Job job) throws IOException {
+ addKuduDependencyJars(job.getConfiguration());
+ try {
+ addDependencyJars(job.getConfiguration(),
+ // when making changes here, consider also mapred.TableMapReduceUtil
+ // pull job classes
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ job.getInputFormatClass(),
+ job.getOutputKeyClass(),
+ job.getOutputValueClass(),
+ job.getOutputFormatClass(),
+ job.getPartitionerClass(),
+ job.getCombinerClass());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Add the jars containing the given classes to the job's configuration
+ * such that JobClient will ship them to the cluster and add them to
+ * the DistributedCache.
+ */
+ public static void addDependencyJars(Configuration conf,
+ Class<?>... classes) throws IOException {
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Set<String> jars = new HashSet<String>();
+ // Add jars that are already in the tmpjars variable
+ jars.addAll(conf.getStringCollection("tmpjars"));
+
+ // add jars as we find them to a map of contents jar name so that we can avoid
+ // creating new jars for classes that have already been packaged.
+ Map<String, String> packagedClasses = new HashMap<String, String>();
+
+ // Add jars containing the specified classes
+ for (Class<?> clazz : classes) {
+ if (clazz == null) continue;
+
+ Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+ if (path == null) {
+ LOG.warn("Could not find jar for class " + clazz +
+ " in order to ship it to the cluster.");
+ continue;
+ }
+ if (!localFs.exists(path)) {
+ LOG.warn("Could not validate jar file " + path + " for class "
+ + clazz);
+ continue;
+ }
+ jars.add(path.toString());
+ }
+ if (jars.isEmpty()) return;
+
+ conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+ }
+
+ /**
+ * Add Kudu and its dependencies (only) to the job configuration.
+ * <p>
+ * This is intended as a low-level API, facilitating code reuse between this
+ * class and its mapred counterpart. It also of use to external tools that
+ * need to build a MapReduce job that interacts with Kudu but want
+ * fine-grained control over the jars shipped to the cluster.
+ * </p>
+ * @param conf The Configuration object to extend with dependencies.
+ * @see KuduTableMapReduceUtil
+ * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
+ */
+ public static void addKuduDependencyJars(Configuration conf) throws IOException {
+ addDependencyJars(conf,
+ // explicitly pull a class from each module
+ Operation.class, // kudu-client
+ KuduTableMapReduceUtil.class, // kudu-mapreduce
+ // pull necessary dependencies
+ com.stumbleupon.async.Deferred.class);
+ }
+
+ /**
+ * Finds the Jar for a class or creates it if it doesn't exist. If the class
+ * is in a directory in the classpath, it creates a Jar on the fly with the
+ * contents of the directory and returns the path to that Jar. If a Jar is
+ * created, it is created in the system temporary directory. Otherwise,
+ * returns an existing jar that contains a class of the same name. Maintains
+ * a mapping from jar contents to the tmp jar created.
+ * @param my_class the class to find.
+ * @param fs the FileSystem with which to qualify the returned path.
+ * @param packagedClasses a map of class name to path.
+ * @return a jar file that contains the class.
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
+ Map<String, String> packagedClasses)
+ throws IOException {
+ // attempt to locate an existing jar for the class.
+ String jar = findContainingJar(my_class, packagedClasses);
+ if (null == jar || jar.isEmpty()) {
+ jar = JarFinder.getJar(my_class);
+ updateMap(jar, packagedClasses);
+ }
+
+ if (null == jar || jar.isEmpty()) {
+ return null;
+ }
+
+ LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
+ return new Path(jar).makeQualified(fs);
+ }
+
+ /**
+ * Find a jar that contains a class of the same name, if any. It will return
+ * a jar file, even if that is not the first thing on the class path that
+ * has a class with the same name. Looks first on the classpath and then in
+ * the <code>packagedClasses</code> map.
+ * @param my_class the class to find.
+ * @return a jar file that contains the class, or null.
+ * @throws IOException
+ */
+ private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
+ throws IOException {
+ ClassLoader loader = my_class.getClassLoader();
+ String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+ // first search the classpath
+ for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+ URL url = itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ // URLDecoder is a misnamed class, since it actually decodes
+ // x-www-form-urlencoded MIME type rather than actual
+ // URL encoding (which the file path has). Therefore it would
+ // decode +s to ' 's which is incorrect (spaces are actually
+ // either unencoded or encoded as "%20"). Replace +s first, so
+ // that they are kept sacred during the decoding process.
+ toReturn = toReturn.replaceAll("\\+", "%2B");
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+
+ // now look in any jars we've packaged using JarFinder. Returns null when
+ // no jar is found.
+ return packagedClasses.get(class_file);
+ }
+
+ /**
+ * Add entries to <code>packagedClasses</code> corresponding to class files
+ * contained in <code>jar</code>.
+ * @param jar The jar who's content to list.
+ * @param packagedClasses map[class -> jar]
+ */
+ private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
+ if (null == jar || jar.isEmpty()) {
+ return;
+ }
+ ZipFile zip = null;
+ try {
+ zip = new ZipFile(jar);
+ for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+ ZipEntry entry = iter.nextElement();
+ if (entry.getName().endsWith("class")) {
+ packagedClasses.put(entry.getName(), jar);
+ }
+ }
+ } finally {
+ if (null != zip) zip.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java
new file mode 100644
index 0000000..8af750b
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputCommitter.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+import java.io.IOException;
+
+/**
+ * Small committer class that does not do anything.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
new file mode 100644
index 0000000..e80b73f
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p>
+ * Use {@link
+ * KuduTableMapReduceUtil.TableOutputFormatConfigurator}
+ * to correctly setup this output format, then {@link
+ * KuduTableMapReduceUtil#getTableFromContext(org.apache.hadoop.mapreduce.TaskInputOutputContext)}
+ * to get a KuduTable.
+ * </p>
+ *
+ * <p>
+ * Hadoop doesn't have the concept of "closing" the output format so in order to release the
+ * resources we assume that once either
+ * {@link #checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link TableRecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)}
+ * have been called that the object won't be used again and the KuduClient is shut down.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableOutputFormat extends OutputFormat<NullWritable,Operation>
+ implements Configurable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduTableOutputFormat.class);
+
+ /** Job parameter that specifies the output table. */
+ static final String OUTPUT_TABLE_KEY = "kudu.mapreduce.output.table";
+
+ /** Job parameter that specifies where the masters are */
+ static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.addresses";
+
+ /** Job parameter that specifies how long we wait for operations to complete */
+ static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
+
+ /** Number of rows that are buffered before flushing to the tablet server */
+ static final String BUFFER_ROW_COUNT_KEY = "kudu.mapreduce.buffer.row.count";
+
+ /**
+ * Job parameter that specifies which key is to be used to reach the KuduTableOutputFormat
+ * belonging to the caller
+ */
+ static final String MULTITON_KEY = "kudu.mapreduce.multitonkey";
+
+ /**
+ * This multiton is used so that the tasks using this output format/record writer can find
+ * their KuduTable without having a direct dependency on this class,
+ * with the additional complexity that the output format cannot be shared between threads.
+ */
+ private static final ConcurrentHashMap<String, KuduTableOutputFormat> MULTITON = new
+ ConcurrentHashMap<String, KuduTableOutputFormat>();
+
+ /**
+ * This counter helps indicate which task log to look at since rows that weren't applied will
+ * increment this counter.
+ */
+ public enum Counters { ROWS_WITH_ERRORS }
+
+ private Configuration conf = null;
+
+ private KuduClient client;
+ private KuduTable table;
+ private KuduSession session;
+ private long operationTimeoutMs;
+
+ @Override
+ public void setConf(Configuration entries) {
+ this.conf = new Configuration(entries);
+
+ String masterAddress = this.conf.get(MASTER_ADDRESSES_KEY);
+ String tableName = this.conf.get(OUTPUT_TABLE_KEY);
+ this.operationTimeoutMs = this.conf.getLong(OPERATION_TIMEOUT_MS_KEY,
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+ int bufferSpace = this.conf.getInt(BUFFER_ROW_COUNT_KEY, 1000);
+
+ this.client = new KuduClient.KuduClientBuilder(masterAddress)
+ .defaultOperationTimeoutMs(operationTimeoutMs)
+ .build();
+ try {
+ this.table = client.openTable(tableName);
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not obtain the table from the master, " +
+ "is the master running and is this table created? tablename=" + tableName + " and " +
+ "master address= " + masterAddress, ex);
+ }
+ this.session = client.newSession();
+ this.session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+ this.session.setMutationBufferSpace(bufferSpace);
+ this.session.setIgnoreAllDuplicateRows(true);
+ String multitonKey = String.valueOf(Thread.currentThread().getId());
+ assert(MULTITON.get(multitonKey) == null);
+ MULTITON.put(multitonKey, this);
+ entries.set(MULTITON_KEY, multitonKey);
+ }
+
+ private void shutdownClient() throws IOException {
+ try {
+ client.shutdown();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static KuduTable getKuduTable(String multitonKey) {
+ return MULTITON.get(multitonKey).getKuduTable();
+ }
+
+ private KuduTable getKuduTable() {
+ return this.table;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public RecordWriter<NullWritable, Operation> getRecordWriter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new TableRecordWriter(this.session);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ shutdownClient();
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws
+ IOException, InterruptedException {
+ return new KuduTableOutputCommitter();
+ }
+
+ protected class TableRecordWriter extends RecordWriter<NullWritable, Operation> {
+
+ private final AtomicLong rowsWithErrors = new AtomicLong();
+ private final KuduSession session;
+
+ public TableRecordWriter(KuduSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public void write(NullWritable key, Operation operation)
+ throws IOException, InterruptedException {
+ try {
+ session.apply(operation);
+ } catch (Exception e) {
+ throw new IOException("Encountered an error while writing", e);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException,
+ InterruptedException {
+ try {
+ processRowErrors(session.close());
+ shutdownClient();
+ } catch (Exception e) {
+ throw new IOException("Encountered an error while closing this task", e);
+ } finally {
+ if (taskAttemptContext != null) {
+ // This is the only place where we have access to the context in the record writer,
+ // so set the counter here.
+ taskAttemptContext.getCounter(Counters.ROWS_WITH_ERRORS).setValue(rowsWithErrors.get());
+ }
+ }
+ }
+
+ private void processRowErrors(List<OperationResponse> responses) {
+ List<RowError> errors = OperationResponse.collectErrors(responses);
+ if (!errors.isEmpty()) {
+ int rowErrorsCount = errors.size();
+ rowsWithErrors.addAndGet(rowErrorsCount);
+ LOG.warn("Got per errors for " + rowErrorsCount + " rows, " +
+ "the first one being " + errors.get(0).getStatus());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java
new file mode 100644
index 0000000..7cf3ada
--- /dev/null
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/TableReducer.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.Operation;
+import org.apache.hadoop.mapreduce.Reducer;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
+ extends Reducer<KEYIN, VALUEIN, KEYOUT, Operation> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java
deleted file mode 100644
index 05c18f2..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/CommandLineParser.java
+++ /dev/null
@@ -1,144 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.apache.hadoop.conf.Configuration;
-import org.kududb.client.KuduClient;
-
-/**
- * Utility class that manages common configurations to all MR jobs. For example,
- * any job that uses {#KuduTableMapReduceUtil} to setup an input or output format
- * and that has parsed the command line arguments with
- * {@link org.apache.hadoop.util.GenericOptionsParser} can simply be passed:
- * <code>
- * -Dmaster.address=ADDRESS
- * </code>
- * in order to specify where the master is.
- * Use {@link CommandLineParser#getHelpSnippet()} to provide usage text for the configurations
- * managed by this class.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class CommandLineParser {
- private final Configuration conf;
- public static final String MASTER_ADDRESSES_KEY = "kudu.master.addresses";
- public static final String MASTER_ADDRESSES_DEFAULT = "127.0.0.1";
- public static final String OPERATION_TIMEOUT_MS_KEY = "kudu.operation.timeout.ms";
- public static final long OPERATION_TIMEOUT_MS_DEFAULT =
- AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
- public static final String ADMIN_OPERATION_TIMEOUT_MS_KEY = "kudu.admin.operation.timeout.ms";
- public static final String SOCKET_READ_TIMEOUT_MS_KEY = "kudu.socket.read.timeout.ms";
- public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT =
- AsyncKuduClient.DEFAULT_SOCKET_READ_TIMEOUT_MS;
- public static final String NUM_REPLICAS_KEY = "kudu.num.replicas";
- public static final int NUM_REPLICAS_DEFAULT = 3;
-
- /**
- * Constructor that uses a Configuration that has already been through
- * {@link org.apache.hadoop.util.GenericOptionsParser}'s command line parsing.
- * @param conf the configuration from which job configurations will be extracted
- */
- public CommandLineParser(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Get the configured master's config.
- * @return a string that contains the passed config, or the default value
- */
- public String getMasterAddresses() {
- return conf.get(MASTER_ADDRESSES_KEY, MASTER_ADDRESSES_DEFAULT);
- }
-
- /**
- * Get the configured timeout for operations on sessions and scanners.
- * @return a long that represents the passed timeout, or the default value
- */
- public long getOperationTimeoutMs() {
- return conf.getLong(OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
- }
-
- /**
- * Get the configured timeout for admin operations.
- * @return a long that represents the passed timeout, or the default value
- */
- public long getAdminOperationTimeoutMs() {
- return conf.getLong(ADMIN_OPERATION_TIMEOUT_MS_KEY, OPERATION_TIMEOUT_MS_DEFAULT);
- }
-
- /**
- * Get the configured timeout for socket reads.
- * @return a long that represents the passed timeout, or the default value
- */
- public long getSocketReadTimeoutMs() {
- return conf.getLong(SOCKET_READ_TIMEOUT_MS_KEY, SOCKET_READ_TIMEOUT_MS_DEFAULT);
- }
-
- /**
- * Get the number of replicas to use when configuring a new table.
- * @return an int that represents the passed number of replicas to use, or the default value.
- */
- public int getNumReplicas() {
- return conf.getInt(NUM_REPLICAS_KEY, NUM_REPLICAS_DEFAULT);
- }
-
- /**
- * Get an async client connected to the configured Master(s).
- * @return an async kudu client
- */
- public AsyncKuduClient getAsyncClient() {
- return new AsyncKuduClient.AsyncKuduClientBuilder(getMasterAddresses())
- .defaultOperationTimeoutMs(getOperationTimeoutMs())
- .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
- .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
- .build();
- }
-
- /**
- * Get a client connected to the configured Master(s).
- * @return a kudu client
- */
- public KuduClient getClient() {
- return new KuduClient.KuduClientBuilder(getMasterAddresses())
- .defaultOperationTimeoutMs(getOperationTimeoutMs())
- .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
- .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
- .build();
- }
-
- /**
- * This method returns a single multi-line string that contains the help snippet to append to
- * the tail of a usage() or help() type of method.
- * @return a string with all the available configurations and their defaults
- */
- public static String getHelpSnippet() {
- return "\nAdditionally, the following options are available:" +
- " -D" + OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for read and write " +
- "operations, defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
- " -D" + ADMIN_OPERATION_TIMEOUT_MS_KEY + "=TIME - timeout for admin operations " +
- ", defaults to " + OPERATION_TIMEOUT_MS_DEFAULT + " \n"+
- " -D" + SOCKET_READ_TIMEOUT_MS_KEY + "=TIME - timeout for socket reads " +
- ", defaults to " + SOCKET_READ_TIMEOUT_MS_DEFAULT + " \n"+
- " -D" + MASTER_ADDRESSES_KEY + "=ADDRESSES - addresses to reach the Masters, " +
- "defaults to " + MASTER_ADDRESSES_DEFAULT + " which is usually wrong.\n" +
- " -D " + NUM_REPLICAS_KEY + "=NUM - number of replicas to use when configuring a new " +
- "table, defaults to " + NUM_REPLICAS_DEFAULT;
- }
-}