You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:08 UTC

[03/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java
deleted file mode 100644
index 57593db..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import com.google.common.base.Preconditions;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.text.MessageFormat;
-import java.util.Enumeration;
-import java.util.jar.JarFile;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-/**
- * Finds the Jar for a class. If the class is in a directory in the
- * classpath, it creates a Jar on the fly with the contents of the directory
- * and returns the path to that Jar. If a Jar is created, it is created in
- * the system temporary directory.
- *
- * This file was forked from hbase/branches/master@4ce6f48.
- */
-public class JarFinder {
-
-  private static void copyToZipStream(File file, ZipEntry entry,
-                                      ZipOutputStream zos) throws IOException {
-    InputStream is = new FileInputStream(file);
-    try {
-      zos.putNextEntry(entry);
-      byte[] arr = new byte[4096];
-      int read = is.read(arr);
-      while (read > -1) {
-        zos.write(arr, 0, read);
-        read = is.read(arr);
-      }
-    } finally {
-      try {
-        is.close();
-      } finally {
-        zos.closeEntry();
-      }
-    }
-  }
-
-  public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
-    throws IOException {
-    Preconditions.checkNotNull(relativePath, "relativePath");
-    Preconditions.checkNotNull(zos, "zos");
-
-    // by JAR spec, if there is a manifest, it must be the first entry in the
-    // ZIP.
-    File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
-    ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
-    if (!manifestFile.exists()) {
-      zos.putNextEntry(manifestEntry);
-      new Manifest().write(new BufferedOutputStream(zos));
-      zos.closeEntry();
-    } else {
-      copyToZipStream(manifestFile, manifestEntry, zos);
-    }
-    zos.closeEntry();
-    zipDir(dir, relativePath, zos, true);
-    zos.close();
-  }
-
-  private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
-                             boolean start) throws IOException {
-    String[] dirList = dir.list();
-    for (String aDirList : dirList) {
-      File f = new File(dir, aDirList);
-      if (!f.isHidden()) {
-        if (f.isDirectory()) {
-          if (!start) {
-            ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
-            zos.putNextEntry(dirEntry);
-            zos.closeEntry();
-          }
-          String filePath = f.getPath();
-          File file = new File(filePath);
-          zipDir(file, relativePath + f.getName() + "/", zos, false);
-        }
-        else {
-          String path = relativePath + f.getName();
-          if (!path.equals(JarFile.MANIFEST_NAME)) {
-            ZipEntry anEntry = new ZipEntry(path);
-            copyToZipStream(f, anEntry, zos);
-          }
-        }
-      }
-    }
-  }
-
-  private static void createJar(File dir, File jarFile) throws IOException {
-    Preconditions.checkNotNull(dir, "dir");
-    Preconditions.checkNotNull(jarFile, "jarFile");
-    File jarDir = jarFile.getParentFile();
-    if (!jarDir.exists()) {
-      if (!jarDir.mkdirs()) {
-        throw new IOException(MessageFormat.format("could not create dir [{0}]",
-          jarDir));
-      }
-    }
-    JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile));
-    jarDir(dir, "", zos);
-  }
-
-  /**
-   * Returns the full path to the Jar containing the class. It always returns a
-   * JAR.
-   *
-   * @param klass class.
-   *
-   * @return path to the Jar containing the class.
-   */
-  public static String getJar(Class klass) {
-    Preconditions.checkNotNull(klass, "klass");
-    ClassLoader loader = klass.getClassLoader();
-    if (loader != null) {
-      String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
-      try {
-        for (Enumeration itr = loader.getResources(class_file);
-             itr.hasMoreElements(); ) {
-          URL url = (URL) itr.nextElement();
-          String path = url.getPath();
-          if (path.startsWith("file:")) {
-            path = path.substring("file:".length());
-          }
-          path = URLDecoder.decode(path, "UTF-8");
-          if ("jar".equals(url.getProtocol())) {
-            path = URLDecoder.decode(path, "UTF-8");
-            return path.replaceAll("!.*$", "");
-          }
-          else if ("file".equals(url.getProtocol())) {
-            String klassName = klass.getName();
-            klassName = klassName.replace(".", "/") + ".class";
-            path = path.substring(0, path.length() - klassName.length());
-            File baseDir = new File(path);
-            File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
-            testDir = testDir.getAbsoluteFile();
-            if (!testDir.exists()) {
-              testDir.mkdirs();
-            }
-            File tempJar = File.createTempFile("hadoop-", "", testDir);
-            tempJar = new File(tempJar.getAbsolutePath() + ".jar");
-            tempJar.deleteOnExit();
-            createJar(baseDir, tempJar);
-            return tempJar.getAbsolutePath();
-          }
-        }
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
deleted file mode 100644
index 25235cb..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/**
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-package org.kududb.mapreduce;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.naming.NamingException;
-
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.net.DNS;
-import org.kududb.Common;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.kududb.client.Bytes;
-import org.kududb.client.KuduClient;
-import org.kududb.client.KuduPredicate;
-import org.kududb.client.KuduScanner;
-import org.kududb.client.KuduTable;
-import org.kududb.client.LocatedTablet;
-import org.kududb.client.RowResult;
-import org.kududb.client.RowResultIterator;
-import org.kududb.client.KuduScanToken;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * This input format generates one split per tablet and the only location for each split is that
- * tablet's leader.
- * </p>
- *
- * <p>
- * Hadoop doesn't have the concept of "closing" the input format so in order to release the
- * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
- * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that
- * the object won't be used again and the AsyncKuduClient is shut down.
- * </p>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
-    implements Configurable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
-
-  /** Job parameter that specifies the input table. */
-  static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
-
-  /** Job parameter that specifies if the scanner should cache blocks or not (default: false). */
-  static final String SCAN_CACHE_BLOCKS = "kudu.mapreduce.input.scan.cache.blocks";
-
-  /** Job parameter that specifies where the masters are. */
-  static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.address";
-
-  /** Job parameter that specifies how long we wait for operations to complete (default: 10s). */
-  static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
-
-  /** Job parameter that specifies the address for the name server. */
-  static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
-
-  /** Job parameter that specifies the encoded column predicates (may be empty). */
-  static final String ENCODED_PREDICATES_KEY =
-      "kudu.mapreduce.encoded.predicates";
-
-  /**
-   * Job parameter that specifies the column projection as a comma-separated list of column names.
-   *
-   * Not specifying this at all (i.e. setting to null) or setting to the special string
-   * '*' means to project all columns.
-   *
-   * Specifying the empty string means to project no columns (i.e just count the rows).
-   */
-  static final String COLUMN_PROJECTION_KEY = "kudu.mapreduce.column.projection";
-
-  /**
-   * The reverse DNS lookup cache mapping: address from Kudu => hostname for Hadoop. This cache is
-   * used in order to not do DNS lookups multiple times for each tablet server.
-   */
-  private final Map<String, String> reverseDNSCacheMap = new HashMap<>();
-
-  private Configuration conf;
-  private KuduClient client;
-  private KuduTable table;
-  private long operationTimeoutMs;
-  private String nameServer;
-  private boolean cacheBlocks;
-  private List<String> projectedCols;
-  private List<KuduPredicate> predicates;
-
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext)
-      throws IOException, InterruptedException {
-    try {
-      if (table == null) {
-        throw new IOException("No table was provided");
-      }
-
-      KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
-                                                      .setProjectedColumnNames(projectedCols)
-                                                      .cacheBlocks(cacheBlocks)
-                                                      .setTimeout(operationTimeoutMs);
-      for (KuduPredicate predicate : predicates) {
-        tokenBuilder.addPredicate(predicate);
-      }
-      List<KuduScanToken> tokens = tokenBuilder.build();
-
-      List<InputSplit> splits = new ArrayList<>(tokens.size());
-      for (KuduScanToken token : tokens) {
-        List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
-        for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
-          locations.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
-        }
-        splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
-      }
-      return splits;
-    } finally {
-      shutdownClient();
-    }
-  }
-
-  private void shutdownClient() throws IOException {
-    try {
-      client.shutdown();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * This method might seem alien, but we do this in order to resolve the hostnames the same way
-   * Hadoop does. This ensures we get locality if Kudu is running along MR/YARN.
-   * @param host hostname we got from the master
-   * @param port port we got from the master
-   * @return reverse DNS'd address
-   */
-  private String reverseDNS(String host, Integer port) {
-    String location = this.reverseDNSCacheMap.get(host);
-    if (location != null) {
-      return location;
-    }
-    // The below InetSocketAddress creation does a name resolution.
-    InetSocketAddress isa = new InetSocketAddress(host, port);
-    if (isa.isUnresolved()) {
-      LOG.warn("Failed address resolve for: " + isa);
-    }
-    InetAddress tabletInetAddress = isa.getAddress();
-    try {
-      location = domainNamePointerToHostName(
-          DNS.reverseDns(tabletInetAddress, this.nameServer));
-      this.reverseDNSCacheMap.put(host, location);
-    } catch (NamingException e) {
-      LOG.warn("Cannot resolve the host name for " + tabletInetAddress + " because of " + e);
-      location = host;
-    }
-    return location;
-  }
-
-  @Override
-  public RecordReader<NullWritable, RowResult> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    return new TableRecordReader();
-  }
-
-  @Override
-  public void setConf(Configuration entries) {
-    this.conf = new Configuration(entries);
-
-    String tableName = conf.get(INPUT_TABLE_KEY);
-    String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
-    this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
-                                           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
-    this.nameServer = conf.get(NAME_SERVER_KEY);
-    this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
-
-    this.client = new KuduClient.KuduClientBuilder(masterAddresses)
-                                .defaultOperationTimeoutMs(operationTimeoutMs)
-                                .build();
-    try {
-      this.table = client.openTable(tableName);
-    } catch (Exception ex) {
-      throw new RuntimeException("Could not obtain the table from the master, " +
-          "is the master running and is this table created? tablename=" + tableName + " and " +
-          "master address= " + masterAddresses, ex);
-    }
-
-    String projectionConfig = conf.get(COLUMN_PROJECTION_KEY);
-    if (projectionConfig == null || projectionConfig.equals("*")) {
-      this.projectedCols = null; // project the whole table
-    } else if ("".equals(projectionConfig)) {
-      this.projectedCols = new ArrayList<>();
-    } else {
-      this.projectedCols = Lists.newArrayList(Splitter.on(',').split(projectionConfig));
-
-      // Verify that the column names are valid -- better to fail with an exception
-      // before we submit the job.
-      Schema tableSchema = table.getSchema();
-      for (String columnName : projectedCols) {
-        if (tableSchema.getColumn(columnName) == null) {
-          throw new IllegalArgumentException("Unknown column " + columnName);
-        }
-      }
-    }
-
-    this.predicates = new ArrayList<>();
-    try {
-      InputStream is =
-          new ByteArrayInputStream(Base64.decodeBase64(conf.get(ENCODED_PREDICATES_KEY, "")));
-      while (is.available() > 0) {
-        this.predicates.add(KuduPredicate.fromPB(table.getSchema(),
-                                                 Common.ColumnPredicatePB.parseDelimitedFrom(is)));
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("unable to deserialize predicates from the configuration", e);
-    }
-  }
-
-  /**
-   * Given a PTR string generated via reverse DNS lookup, return everything
-   * except the trailing period. Example for host.example.com., return
-   * host.example.com
-   * @param dnPtr a domain name pointer (PTR) string.
-   * @return Sanitized hostname with last period stripped off.
-   *
-   */
-  private static String domainNamePointerToHostName(String dnPtr) {
-    if (dnPtr == null)
-      return null;
-    return dnPtr.endsWith(".") ? dnPtr.substring(0, dnPtr.length() - 1) : dnPtr;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
-
-    /** The scan token that the split will use to scan the Kudu table. */
-    private byte[] scanToken;
-
-    /** The start partition key of the scan. Used for sorting splits. */
-    private byte[] partitionKey;
-
-    /** Tablet server locations which host the tablet to be scanned. */
-    private String[] locations;
-
-    public TableSplit() { } // Writable
-
-    public TableSplit(KuduScanToken token, String[] locations) throws IOException {
-      this.scanToken = token.serialize();
-      this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
-      this.locations = locations;
-    }
-
-    public byte[] getScanToken() {
-      return scanToken;
-    }
-
-    public byte[] getPartitionKey() {
-      return partitionKey;
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-      // TODO Guesstimate a size
-      return 0;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-      return locations;
-    }
-
-    @Override
-    public int compareTo(TableSplit other) {
-      return UnsignedBytes.lexicographicalComparator().compare(partitionKey, other.partitionKey);
-    }
-
-    @Override
-    public void write(DataOutput dataOutput) throws IOException {
-      Bytes.writeByteArray(dataOutput, scanToken);
-      Bytes.writeByteArray(dataOutput, partitionKey);
-      dataOutput.writeInt(locations.length);
-      for (String location : locations) {
-        byte[] str = Bytes.fromString(location);
-        Bytes.writeByteArray(dataOutput, str);
-      }
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-      scanToken = Bytes.readByteArray(dataInput);
-      partitionKey = Bytes.readByteArray(dataInput);
-      locations = new String[dataInput.readInt()];
-      for (int i = 0; i < locations.length; i++) {
-        byte[] str = Bytes.readByteArray(dataInput);
-        locations[i] = Bytes.getString(str);
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      // We currently just care about the partition key since we're within the same table.
-      return Arrays.hashCode(partitionKey);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      TableSplit that = (TableSplit) o;
-
-      return this.compareTo(that) == 0;
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-                    .add("partitionKey", Bytes.pretty(partitionKey))
-                    .add("locations", Arrays.toString(locations))
-                    .toString();
-    }
-  }
-
-  class TableRecordReader extends RecordReader<NullWritable, RowResult> {
-
-    private final NullWritable currentKey = NullWritable.get();
-    private RowResult currentValue;
-    private RowResultIterator iterator;
-    private KuduScanner scanner;
-    private TableSplit split;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-      if (!(inputSplit instanceof TableSplit)) {
-        throw new IllegalArgumentException("TableSplit is the only accepted input split");
-      }
-
-      split = (TableSplit) inputSplit;
-
-      try {
-        scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-
-      // Calling this now to set iterator.
-      tryRefreshIterator();
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (!iterator.hasNext()) {
-        tryRefreshIterator();
-        if (!iterator.hasNext()) {
-          // Means we still have the same iterator, we're done
-          return false;
-        }
-      }
-      currentValue = iterator.next();
-      return true;
-    }
-
-    /**
-     * If the scanner has more rows, get a new iterator else don't do anything.
-     * @throws IOException
-     */
-    private void tryRefreshIterator() throws IOException {
-      if (!scanner.hasMoreRows()) {
-        return;
-      }
-      try {
-        iterator = scanner.nextRows();
-      } catch (Exception e) {
-        throw new IOException("Couldn't get scan data", e);
-      }
-    }
-
-    @Override
-    public NullWritable getCurrentKey() throws IOException, InterruptedException {
-      return currentKey;
-    }
-
-    @Override
-    public RowResult getCurrentValue() throws IOException, InterruptedException {
-      return currentValue;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      // TODO Guesstimate progress
-      return 0;
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        scanner.close();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-      shutdownClient();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
deleted file mode 100644
index 0b919d9..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-package org.kududb.mapreduce;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.StringUtils;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.kududb.client.ColumnRangePredicate;
-import org.kududb.client.KuduPredicate;
-import org.kududb.client.KuduTable;
-import org.kududb.client.Operation;
-
-/**
- * Utility class to setup MR jobs that use Kudu as an input and/or output.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableMapReduceUtil {
-  // Mostly lifted from HBase's TableMapReduceUtil
-
-  private static final Log LOG = LogFactory.getLog(KuduTableMapReduceUtil.class);
-
-  /**
-   * Doesn't need instantiation
-   */
-  private KuduTableMapReduceUtil() { }
-
-
-  /**
-   * Base class for MR I/O formats, contains the common configurations.
-   */
-  private static abstract class AbstractMapReduceConfigurator<S> {
-    protected final Job job;
-    protected final String table;
-
-    protected boolean addDependencies = true;
-
-    /**
-     * Constructor for the required fields to configure.
-     * @param job a job to configure
-     * @param table a string that contains the name of the table to read from
-     */
-    private AbstractMapReduceConfigurator(Job job, String table) {
-      this.job = job;
-      this.table = table;
-    }
-
-    /**
-     * Sets whether this job should add Kudu's dependencies to the distributed cache. Turned on
-     * by default.
-     * @param addDependencies a boolean that says if we should add the dependencies
-     * @return this instance
-     */
-    @SuppressWarnings("unchecked")
-    public S addDependencies(boolean addDependencies) {
-      this.addDependencies = addDependencies;
-      return (S) this;
-    }
-
-    /**
-     * Configures the job using the passed parameters.
-     * @throws IOException If addDependencies is enabled and a problem is encountered reading
-     * files on the filesystem
-     */
-    public abstract void configure() throws IOException;
-  }
-
-  /**
-   * Builder-like class that sets up the required configurations and classes to write to Kudu.
-   * <p>
-   * Use either child classes when configuring the table output format.
-   */
-  private static abstract class AbstractTableOutputFormatConfigurator
-      <S extends AbstractTableOutputFormatConfigurator<? super S>>
-      extends AbstractMapReduceConfigurator<S> {
-
-    protected String masterAddresses;
-    protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-
-    /**
-     * {@inheritDoc}
-     */
-    private AbstractTableOutputFormatConfigurator(Job job, String table) {
-      super(job, table);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void configure() throws IOException {
-      job.setOutputFormatClass(KuduTableOutputFormat.class);
-      job.setOutputKeyClass(NullWritable.class);
-      job.setOutputValueClass(Operation.class);
-
-      Configuration conf = job.getConfiguration();
-      conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
-      conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, table);
-      conf.setLong(KuduTableOutputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
-      if (addDependencies) {
-        addDependencyJars(job);
-      }
-    }
-  }
-
-  /**
-   * Builder-like class that sets up the required configurations and classes to read from Kudu.
-   * By default, block caching is disabled.
-   * <p>
-   * Use either child classes when configuring the table input format.
-   */
-  private static abstract class AbstractTableInputFormatConfigurator
-      <S extends AbstractTableInputFormatConfigurator<? super S>>
-      extends AbstractMapReduceConfigurator<S> {
-
-    protected String masterAddresses;
-    protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-    protected final String columnProjection;
-    protected boolean cacheBlocks;
-    protected List<KuduPredicate> predicates = new ArrayList<>();
-
-    /**
-     * Constructor for the required fields to configure.
-     * @param job a job to configure
-     * @param table a string that contains the name of the table to read from
-     * @param columnProjection a string containing a comma-separated list of columns to read.
-     *                         It can be null in which case we read empty rows
-     */
-    private AbstractTableInputFormatConfigurator(Job job, String table, String columnProjection) {
-      super(job, table);
-      this.columnProjection = columnProjection;
-    }
-
-    /**
-     * Sets the block caching configuration for the scanners. Turned off by default.
-     * @param cacheBlocks whether the job should use scanners that cache blocks.
-     * @return this instance
-     */
-    public S cacheBlocks(boolean cacheBlocks) {
-      this.cacheBlocks = cacheBlocks;
-      return (S) this;
-    }
-
-    /**
-     * Configures the job with all the passed parameters.
-     * @throws IOException If addDependencies is enabled and a problem is encountered reading
-     * files on the filesystem
-     */
-    public void configure() throws IOException {
-      job.setInputFormatClass(KuduTableInputFormat.class);
-
-      Configuration conf = job.getConfiguration();
-
-      conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
-      conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, table);
-      conf.setLong(KuduTableInputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
-      conf.setBoolean(KuduTableInputFormat.SCAN_CACHE_BLOCKS, cacheBlocks);
-
-      if (columnProjection != null) {
-        conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
-      }
-
-      conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, base64EncodePredicates(predicates));
-
-      if (addDependencies) {
-        addDependencyJars(job);
-      }
-    }
-  }
-
-  /**
-   * Returns the provided predicates as a Base64 encoded string.
-   * @param predicates the predicates to encode
-   * @return the encoded predicates
-   */
-  static String base64EncodePredicates(List<KuduPredicate> predicates) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    for (KuduPredicate predicate : predicates) {
-      predicate.toPB().writeDelimitedTo(baos);
-    }
-    return Base64.encodeBase64String(baos.toByteArray());
-  }
-
-
-  /**
-   * Table output format configurator to use to specify the parameters directly.
-   */
-  public static class TableOutputFormatConfigurator
-      extends AbstractTableOutputFormatConfigurator<TableOutputFormatConfigurator> {
-
-    /**
-     * Constructor for the required fields to configure.
-     * @param job a job to configure
-     * @param table a string that contains the name of the table to read from
-     * @param masterAddresses a comma-separated list of masters' hosts and ports
-     */
-    public TableOutputFormatConfigurator(Job job, String table, String masterAddresses) {
-      super(job, table);
-      this.masterAddresses = masterAddresses;
-    }
-
-    /**
-     * Sets the timeout for all the operations. The default is 10 seconds.
-     * @param operationTimeoutMs a long that represents the timeout for operations to complete,
-     *                           must be a positive value or 0
-     * @return this instance
-     * @throws IllegalArgumentException if the operation timeout is lower than 0
-     */
-    public TableOutputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
-      if (operationTimeoutMs < 0) {
-        throw new IllegalArgumentException("The operation timeout must be => 0, " +
-            "passed value is: " + operationTimeoutMs);
-      }
-      this.operationTimeoutMs = operationTimeoutMs;
-      return this;
-    }
-  }
-
-  /**
-   * Table output format that uses a {@link CommandLineParser} in order to set the
-   * master config and the operation timeout.
-   */
-  public static class TableOutputFormatConfiguratorWithCommandLineParser extends
-      AbstractTableOutputFormatConfigurator<TableOutputFormatConfiguratorWithCommandLineParser> {
-
-    /**
-     * {@inheritDoc}
-     */
-    public TableOutputFormatConfiguratorWithCommandLineParser(Job job, String table) {
-      super(job, table);
-      CommandLineParser parser = new CommandLineParser(job.getConfiguration());
-      this.masterAddresses = parser.getMasterAddresses();
-      this.operationTimeoutMs = parser.getOperationTimeoutMs();
-    }
-  }
-
-  /**
-   * Table input format configurator to use to specify the parameters directly.
-   */
-  public static class TableInputFormatConfigurator
-      extends AbstractTableInputFormatConfigurator<TableInputFormatConfigurator> {
-
-    /**
-     * Constructor for the required fields to configure.
-     * @param job a job to configure
-     * @param table a string that contains the name of the table to read from
-     * @param columnProjection a string containing a comma-separated list of columns to read.
-     *                         It can be null in which case we read empty rows
-     * @param masterAddresses a comma-separated list of masters' hosts and ports
-     */
-    public TableInputFormatConfigurator(Job job, String table, String columnProjection,
-                                        String masterAddresses) {
-      super(job, table, columnProjection);
-      this.masterAddresses = masterAddresses;
-    }
-
-    /**
-     * Sets the timeout for all the operations. The default is 10 seconds.
-     * @param operationTimeoutMs a long that represents the timeout for operations to complete,
-     *                           must be a positive value or 0
-     * @return this instance
-     * @throws IllegalArgumentException if the operation timeout is lower than 0
-     */
-    public TableInputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
-      if (operationTimeoutMs < 0) {
-        throw new IllegalArgumentException("The operation timeout must be => 0, " +
-            "passed value is: " + operationTimeoutMs);
-      }
-      this.operationTimeoutMs = operationTimeoutMs;
-      return this;
-    }
-
-    /**
-     * Adds a new predicate that will be pushed down to all the tablets.
-     * @param predicate a predicate to add
-     * @return this instance
-     * @deprecated use {@link #addPredicate}
-     */
-    @Deprecated
-    public TableInputFormatConfigurator addColumnRangePredicate(ColumnRangePredicate predicate) {
-      return addPredicate(predicate.toKuduPredicate());
-    }
-
-    /**
-     * Adds a new predicate that will be pushed down to all the tablets.
-     * @param predicate a predicate to add
-     * @return this instance
-     */
-    public TableInputFormatConfigurator addPredicate(KuduPredicate predicate) {
-      this.predicates.add(predicate);
-      return this;
-    }
-  }
-
-  /**
-   * Table input format that uses a {@link CommandLineParser} in order to set the
-   * master config and the operation timeout.
-   * This version cannot set column range predicates.
-   */
-  public static class TableInputFormatConfiguratorWithCommandLineParser extends
-      AbstractTableInputFormatConfigurator<TableInputFormatConfiguratorWithCommandLineParser> {
-
-    /**
-     * {@inheritDoc}
-     */
-    public TableInputFormatConfiguratorWithCommandLineParser(Job job,
-                                                             String table,
-                                                             String columnProjection) {
-      super(job, table, columnProjection);
-      CommandLineParser parser = new CommandLineParser(job.getConfiguration());
-      this.masterAddresses = parser.getMasterAddresses();
-      this.operationTimeoutMs = parser.getOperationTimeoutMs();
-    }
-  }
-
-  /**
-   * Use this method when setting up a task to get access to the KuduTable in order to create
-   * Inserts, Updates, and Deletes.
-   * @param context Map context
-   * @return The kudu table object as setup by the output format
-   */
-  @SuppressWarnings("rawtypes")
-  public static KuduTable getTableFromContext(TaskInputOutputContext context) {
-    String multitonKey = context.getConfiguration().get(KuduTableOutputFormat.MULTITON_KEY);
-    return KuduTableOutputFormat.getKuduTable(multitonKey);
-  }
-
-  /**
-   * Add the Kudu dependency jars as well as jars for any of the configured
-   * job classes to the job configuration, so that JobClient will ship them
-   * to the cluster and add them to the DistributedCache.
-   */
-  public static void addDependencyJars(Job job) throws IOException {
-    addKuduDependencyJars(job.getConfiguration());
-    try {
-      addDependencyJars(job.getConfiguration(),
-          // when making changes here, consider also mapred.TableMapReduceUtil
-          // pull job classes
-          job.getMapOutputKeyClass(),
-          job.getMapOutputValueClass(),
-          job.getInputFormatClass(),
-          job.getOutputKeyClass(),
-          job.getOutputValueClass(),
-          job.getOutputFormatClass(),
-          job.getPartitionerClass(),
-          job.getCombinerClass());
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Add the jars containing the given classes to the job's configuration
-   * such that JobClient will ship them to the cluster and add them to
-   * the DistributedCache.
-   */
-  public static void addDependencyJars(Configuration conf,
-                                       Class<?>... classes) throws IOException {
-
-    FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<String>();
-    // Add jars that are already in the tmpjars variable
-    jars.addAll(conf.getStringCollection("tmpjars"));
-
-    // add jars as we find them to a map of contents jar name so that we can avoid
-    // creating new jars for classes that have already been packaged.
-    Map<String, String> packagedClasses = new HashMap<String, String>();
-
-    // Add jars containing the specified classes
-    for (Class<?> clazz : classes) {
-      if (clazz == null) continue;
-
-      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
-      if (path == null) {
-        LOG.warn("Could not find jar for class " + clazz +
-            " in order to ship it to the cluster.");
-        continue;
-      }
-      if (!localFs.exists(path)) {
-        LOG.warn("Could not validate jar file " + path + " for class "
-            + clazz);
-        continue;
-      }
-      jars.add(path.toString());
-    }
-    if (jars.isEmpty()) return;
-
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
-  }
-
-  /**
-   * Add Kudu and its dependencies (only) to the job configuration.
-   * <p>
-   * This is intended as a low-level API, facilitating code reuse between this
-   * class and its mapred counterpart. It also of use to external tools that
-   * need to build a MapReduce job that interacts with Kudu but want
-   * fine-grained control over the jars shipped to the cluster.
-   * </p>
-   * @param conf The Configuration object to extend with dependencies.
-   * @see KuduTableMapReduceUtil
-   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
-   */
-  public static void addKuduDependencyJars(Configuration conf) throws IOException {
-    addDependencyJars(conf,
-        // explicitly pull a class from each module
-        Operation.class,                      // kudu-client
-        KuduTableMapReduceUtil.class,   // kudu-mapreduce
-        // pull necessary dependencies
-        com.stumbleupon.async.Deferred.class);
-  }
-
-  /**
-   * Finds the Jar for a class or creates it if it doesn't exist. If the class
-   * is in a directory in the classpath, it creates a Jar on the fly with the
-   * contents of the directory and returns the path to that Jar. If a Jar is
-   * created, it is created in the system temporary directory. Otherwise,
-   * returns an existing jar that contains a class of the same name. Maintains
-   * a mapping from jar contents to the tmp jar created.
-   * @param my_class the class to find.
-   * @param fs the FileSystem with which to qualify the returned path.
-   * @param packagedClasses a map of class name to path.
-   * @return a jar file that contains the class.
-   * @throws IOException
-   */
-  @SuppressWarnings("deprecation")
-  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
-                                      Map<String, String> packagedClasses)
-      throws IOException {
-    // attempt to locate an existing jar for the class.
-    String jar = findContainingJar(my_class, packagedClasses);
-    if (null == jar || jar.isEmpty()) {
-      jar = JarFinder.getJar(my_class);
-      updateMap(jar, packagedClasses);
-    }
-
-    if (null == jar || jar.isEmpty()) {
-      return null;
-    }
-
-    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
-    return new Path(jar).makeQualified(fs);
-  }
-
-  /**
-   * Find a jar that contains a class of the same name, if any. It will return
-   * a jar file, even if that is not the first thing on the class path that
-   * has a class with the same name. Looks first on the classpath and then in
-   * the <code>packagedClasses</code> map.
-   * @param my_class the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
-      throws IOException {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-
-    // first search the classpath
-    for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
-      URL url = itr.nextElement();
-      if ("jar".equals(url.getProtocol())) {
-        String toReturn = url.getPath();
-        if (toReturn.startsWith("file:")) {
-          toReturn = toReturn.substring("file:".length());
-        }
-        // URLDecoder is a misnamed class, since it actually decodes
-        // x-www-form-urlencoded MIME type rather than actual
-        // URL encoding (which the file path has). Therefore it would
-        // decode +s to ' 's which is incorrect (spaces are actually
-        // either unencoded or encoded as "%20"). Replace +s first, so
-        // that they are kept sacred during the decoding process.
-        toReturn = toReturn.replaceAll("\\+", "%2B");
-        toReturn = URLDecoder.decode(toReturn, "UTF-8");
-        return toReturn.replaceAll("!.*$", "");
-      }
-    }
-
-    // now look in any jars we've packaged using JarFinder. Returns null when
-    // no jar is found.
-    return packagedClasses.get(class_file);
-  }
-
-  /**
-   * Add entries to <code>packagedClasses</code> corresponding to class files
-   * contained in <code>jar</code>.
-   * @param jar The jar who's content to list.
-   * @param packagedClasses map[class -> jar]
-   */
-  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
-    if (null == jar || jar.isEmpty()) {
-      return;
-    }
-    ZipFile zip = null;
-    try {
-      zip = new ZipFile(jar);
-      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
-        ZipEntry entry = iter.nextElement();
-        if (entry.getName().endsWith("class")) {
-          packagedClasses.put(entry.getName(), jar);
-        }
-      }
-    } finally {
-      if (null != zip) zip.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java
deleted file mode 100644
index 8af750b..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-
-import java.io.IOException;
-
-/**
- * Small committer class that does not do anything.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableOutputCommitter extends OutputCommitter {
-  @Override
-  public void setupJob(JobContext jobContext) throws IOException {
-
-  }
-
-  @Override
-  public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
-  }
-
-  @Override
-  public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
-  }
-
-  @Override
-  public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java
deleted file mode 100644
index e80b73f..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java
+++ /dev/null
@@ -1,215 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.*;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <p>
- * Use {@link
- * KuduTableMapReduceUtil.TableOutputFormatConfigurator}
- * to correctly setup this output format, then {@link
- * KuduTableMapReduceUtil#getTableFromContext(org.apache.hadoop.mapreduce.TaskInputOutputContext)}
- * to get a KuduTable.
- * </p>
- *
- * <p>
- * Hadoop doesn't have the concept of "closing" the output format so in order to release the
- * resources we assume that once either
- * {@link #checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)}
- * or {@link TableRecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)}
- * have been called that the object won't be used again and the KuduClient is shut down.
- * </p>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableOutputFormat extends OutputFormat<NullWritable,Operation>
-    implements Configurable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KuduTableOutputFormat.class);
-
-  /** Job parameter that specifies the output table. */
-  static final String OUTPUT_TABLE_KEY = "kudu.mapreduce.output.table";
-
-  /** Job parameter that specifies where the masters are */
-  static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.addresses";
-
-  /** Job parameter that specifies how long we wait for operations to complete */
-  static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
-
-  /** Number of rows that are buffered before flushing to the tablet server */
-  static final String BUFFER_ROW_COUNT_KEY = "kudu.mapreduce.buffer.row.count";
-
-  /**
-   * Job parameter that specifies which key is to be used to reach the KuduTableOutputFormat
-   * belonging to the caller
-   */
-  static final String MULTITON_KEY = "kudu.mapreduce.multitonkey";
-
-  /**
-   * This multiton is used so that the tasks using this output format/record writer can find
-   * their KuduTable without having a direct dependency on this class,
-   * with the additional complexity that the output format cannot be shared between threads.
-   */
-  private static final ConcurrentHashMap<String, KuduTableOutputFormat> MULTITON = new
-      ConcurrentHashMap<String, KuduTableOutputFormat>();
-
-  /**
-   * This counter helps indicate which task log to look at since rows that weren't applied will
-   * increment this counter.
-   */
-  public enum Counters { ROWS_WITH_ERRORS }
-
-  private Configuration conf = null;
-
-  private KuduClient client;
-  private KuduTable table;
-  private KuduSession session;
-  private long operationTimeoutMs;
-
-  @Override
-  public void setConf(Configuration entries) {
-    this.conf = new Configuration(entries);
-
-    String masterAddress = this.conf.get(MASTER_ADDRESSES_KEY);
-    String tableName = this.conf.get(OUTPUT_TABLE_KEY);
-    this.operationTimeoutMs = this.conf.getLong(OPERATION_TIMEOUT_MS_KEY,
-        AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
-    int bufferSpace = this.conf.getInt(BUFFER_ROW_COUNT_KEY, 1000);
-
-    this.client = new KuduClient.KuduClientBuilder(masterAddress)
-        .defaultOperationTimeoutMs(operationTimeoutMs)
-        .build();
-    try {
-      this.table = client.openTable(tableName);
-    } catch (Exception ex) {
-      throw new RuntimeException("Could not obtain the table from the master, " +
-          "is the master running and is this table created? tablename=" + tableName + " and " +
-          "master address= " + masterAddress, ex);
-    }
-    this.session = client.newSession();
-    this.session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
-    this.session.setMutationBufferSpace(bufferSpace);
-    this.session.setIgnoreAllDuplicateRows(true);
-    String multitonKey = String.valueOf(Thread.currentThread().getId());
-    assert(MULTITON.get(multitonKey) == null);
-    MULTITON.put(multitonKey, this);
-    entries.set(MULTITON_KEY, multitonKey);
-  }
-
-  private void shutdownClient() throws IOException {
-    try {
-      client.shutdown();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  public static KuduTable getKuduTable(String multitonKey) {
-    return MULTITON.get(multitonKey).getKuduTable();
-  }
-
-  private KuduTable getKuduTable() {
-    return this.table;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public RecordWriter<NullWritable, Operation> getRecordWriter(TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-    return new TableRecordWriter(this.session);
-  }
-
-  @Override
-  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
-    shutdownClient();
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws
-      IOException, InterruptedException {
-    return new KuduTableOutputCommitter();
-  }
-
-  protected class TableRecordWriter extends RecordWriter<NullWritable, Operation> {
-
-    private final AtomicLong rowsWithErrors = new AtomicLong();
-    private final KuduSession session;
-
-    public TableRecordWriter(KuduSession session) {
-      this.session = session;
-    }
-
-    @Override
-    public void write(NullWritable key, Operation operation)
-        throws IOException, InterruptedException {
-      try {
-        session.apply(operation);
-      } catch (Exception e) {
-        throw new IOException("Encountered an error while writing", e);
-      }
-    }
-
-    @Override
-    public void close(TaskAttemptContext taskAttemptContext) throws IOException,
-        InterruptedException {
-      try {
-        processRowErrors(session.close());
-        shutdownClient();
-      } catch (Exception e) {
-        throw new IOException("Encountered an error while closing this task", e);
-      } finally {
-        if (taskAttemptContext != null) {
-          // This is the only place where we have access to the context in the record writer,
-          // so set the counter here.
-          taskAttemptContext.getCounter(Counters.ROWS_WITH_ERRORS).setValue(rowsWithErrors.get());
-        }
-      }
-    }
-
-    private void processRowErrors(List<OperationResponse> responses) {
-      List<RowError> errors = OperationResponse.collectErrors(responses);
-      if (!errors.isEmpty()) {
-        int rowErrorsCount = errors.size();
-        rowsWithErrors.addAndGet(rowErrorsCount);
-        LOG.warn("Got per errors for " + rowErrorsCount + " rows, " +
-            "the first one being " + errors.get(0).getStatus());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java
deleted file mode 100644
index 7cf3ada..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.Operation;
-import org.apache.hadoop.mapreduce.Reducer;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
-    extends Reducer<KEYIN, VALUEIN, KEYOUT, Operation> {
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java
new file mode 100644
index 0000000..1e2cb41
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.mapreduce;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * This class is analog to HBaseTestingUtility except that we only need it for the MR tests.
+ */
+public class HadoopTestingUtility {
+
+  private static final Log LOG = LogFactory.getLog(HadoopTestingUtility.class);
+
+  private File testDir;
+
+  private Configuration conf = new Configuration();
+
+  /**
+   * System property key to get base test directory value
+   */
+  public static final String BASE_TEST_DIRECTORY_KEY =
+      "test.build.data.basedirectory";
+
+  /**
+   * Default base directory for test output.
+   */
+  private static final String DEFAULT_BASE_TEST_DIRECTORY = "target/mr-data";
+
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  /**
+   * Sets up a temporary directory for the test to run in. Call cleanup() at the end of your
+   * tests to remove it.
+   * @param testName Will be used to build a part of the directory name for the test
+   * @return Where the test is homed
+   */
+  public File setupAndGetTestDir(String testName, Configuration conf) {
+    if (this.testDir != null) {
+      return this.testDir;
+    }
+    Path testPath = new Path(getBaseTestDir(), testName + System.currentTimeMillis());
+    this.testDir = new File(testPath.toString()).getAbsoluteFile();
+    this.testDir.mkdirs();
+    // Set this property so when mapreduce jobs run, they will use this as their home dir.
+    System.setProperty("test.build.dir", this.testDir.toString());
+    System.setProperty("hadoop.home.dir", this.testDir.toString());
+    conf.set("hadoop.tmp.dir", this.testDir.toString() + "/mapred");
+
+    LOG.info("Test configured to write to " + this.testDir);
+    return this.testDir;
+  }
+
+  private Path getBaseTestDir() {
+    String pathName = System.getProperty(BASE_TEST_DIRECTORY_KEY, DEFAULT_BASE_TEST_DIRECTORY);
+    return new Path(pathName);
+  }
+
+  public void cleanup() throws IOException {
+    FileSystem.closeAll();
+    if (this.testDir != null) {
+      delete(this.testDir);
+    }
+  }
+
+  private void delete(File dir) throws IOException {
+    if (dir == null || !dir.exists()) {
+      return;
+    }
+    try {
+      FileUtils.deleteDirectory(dir);
+    } catch (IOException ex) {
+      LOG.warn("Failed to delete " + dir.getAbsolutePath());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
new file mode 100644
index 0000000..3d04043
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class ITInputFormatJob extends BaseKuduTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ITInputFormatJob.class);
+
+  private static final String TABLE_NAME =
+      ITInputFormatJob.class.getName() + "-" + System.currentTimeMillis();
+
+  private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+  /** Counter enumeration to count the actual rows. */
+  private static enum Counters { ROWS }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    BaseKuduTest.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      BaseKuduTest.tearDownAfterClass();
+    } finally {
+      HADOOP_UTIL.cleanup();
+    }
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void test() throws Exception {
+
+    createFourTabletsTableWithNineRows(TABLE_NAME);
+
+    Configuration conf = new Configuration();
+    HADOOP_UTIL.setupAndGetTestDir(ITInputFormatJob.class.getName(), conf).getAbsolutePath();
+
+    createAndTestJob(conf, new ArrayList<KuduPredicate>(), 9);
+
+    KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+        basicSchema.getColumnByIndex(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 20);
+    createAndTestJob(conf, Lists.newArrayList(pred1), 6);
+
+    KuduPredicate pred2 = KuduPredicate.newComparisonPredicate(
+        basicSchema.getColumnByIndex(2), KuduPredicate.ComparisonOp.LESS_EQUAL, 1);
+    createAndTestJob(conf, Lists.newArrayList(pred1, pred2), 2);
+  }
+
+  private void createAndTestJob(Configuration conf,
+                                List<KuduPredicate> predicates, int expectedCount)
+      throws Exception {
+    String jobName = ITInputFormatJob.class.getName();
+    Job job = new Job(conf, jobName);
+
+    Class<TestMapperTableInput> mapperClass = TestMapperTableInput.class;
+    job.setJarByClass(mapperClass);
+    job.setMapperClass(mapperClass);
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    KuduTableMapReduceUtil.TableInputFormatConfigurator configurator =
+        new KuduTableMapReduceUtil.TableInputFormatConfigurator(
+            job,
+            TABLE_NAME,
+            "*",
+            getMasterAddresses())
+            .operationTimeoutMs(DEFAULT_SLEEP)
+            .addDependencies(false)
+            .cacheBlocks(false);
+    for (KuduPredicate predicate : predicates) {
+      configurator.addPredicate(predicate);
+    }
+    configurator.configure();
+
+    assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+    assertEquals(expectedCount, job.getCounters().findCounter(Counters.ROWS).getValue());
+  }
+
+  /**
+   * Simple row counter and printer
+   */
+  static class TestMapperTableInput extends
+      Mapper<NullWritable, RowResult, NullWritable, NullWritable> {
+
+    @Override
+    protected void map(NullWritable key, RowResult value, Context context) throws IOException,
+        InterruptedException {
+      context.getCounter(Counters.ROWS).increment(1);
+      LOG.info(value.toStringLongFormat()); // useful to visual debugging
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
new file mode 100644
index 0000000..ff4d81a
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.kududb.Schema;
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class ITKuduTableInputFormat extends BaseKuduTest {
+
+  private static final String TABLE_NAME =
+      ITKuduTableInputFormat.class.getName() + "-" + System.currentTimeMillis();
+
+  @Test
+  public void test() throws Exception {
+    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+
+    KuduTable table = openTable(TABLE_NAME);
+    Schema schema = getBasicSchema();
+    Insert insert = table.newInsert();
+    PartialRow row = insert.getRow();
+    row.addInt(0, 1);
+    row.addInt(1, 2);
+    row.addInt(2, 3);
+    row.addString(3, "a string");
+    row.addBoolean(4, true);
+    AsyncKuduSession session = client.newSession();
+    session.apply(insert).join(DEFAULT_SLEEP);
+    session.close().join(DEFAULT_SLEEP);
+
+    // Test getting all the columns back
+    RecordReader<NullWritable, RowResult> reader = createRecordReader("*", null);
+    assertTrue(reader.nextKeyValue());
+    assertEquals(5, reader.getCurrentValue().getColumnProjection().getColumnCount());
+    assertFalse(reader.nextKeyValue());
+
+    // Test getting two columns back
+    reader = createRecordReader(schema.getColumnByIndex(3).getName() + "," +
+        schema.getColumnByIndex(2).getName(), null);
+    assertTrue(reader.nextKeyValue());
+    assertEquals(2, reader.getCurrentValue().getColumnProjection().getColumnCount());
+    assertEquals("a string", reader.getCurrentValue().getString(0));
+    assertEquals(3, reader.getCurrentValue().getInt(1));
+    try {
+      reader.getCurrentValue().getString(2);
+      fail("Should only be getting 2 columns back");
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+
+    // Test getting one column back
+    reader = createRecordReader(schema.getColumnByIndex(1).getName(), null);
+    assertTrue(reader.nextKeyValue());
+    assertEquals(1, reader.getCurrentValue().getColumnProjection().getColumnCount());
+    assertEquals(2, reader.getCurrentValue().getInt(0));
+    try {
+      reader.getCurrentValue().getString(1);
+      fail("Should only be getting 1 column back");
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+
+    // Test getting empty rows back
+    reader = createRecordReader("", null);
+    assertTrue(reader.nextKeyValue());
+    assertEquals(0, reader.getCurrentValue().getColumnProjection().getColumnCount());
+    assertFalse(reader.nextKeyValue());
+
+    // Test getting an unknown table, will not work
+    try {
+      createRecordReader("unknown", null);
+      fail("Should not be able to scan a column that doesn't exist");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    // Test using a predicate that filters the row out.
+    KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+        schema.getColumnByIndex(1), KuduPredicate.ComparisonOp.GREATER_EQUAL, 3);
+    reader = createRecordReader("*", Lists.newArrayList(pred1));
+    assertFalse(reader.nextKeyValue());
+  }
+
+  private RecordReader<NullWritable, RowResult> createRecordReader(String columnProjection,
+        List<KuduPredicate> predicates) throws IOException, InterruptedException {
+    KuduTableInputFormat input = new KuduTableInputFormat();
+    Configuration conf = new Configuration();
+    conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
+    conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, TABLE_NAME);
+    if (columnProjection != null) {
+      conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
+    }
+    if (predicates != null) {
+      String encodedPredicates = KuduTableMapReduceUtil.base64EncodePredicates(predicates);
+      conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, encodedPredicates);
+    }
+    input.setConf(conf);
+    List<InputSplit> splits = input.getSplits(null);
+
+    // We need to re-create the input format to reconnect the client.
+    input = new KuduTableInputFormat();
+    input.setConf(conf);
+    RecordReader<NullWritable, RowResult> reader = input.createRecordReader(null, null);
+    reader.initialize(Iterables.getOnlyElement(splits), null);
+    return reader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
new file mode 100644
index 0000000..86452ed
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ITKuduTableOutputFormat extends BaseKuduTest {
+
+  private static final String TABLE_NAME =
+      ITKuduTableOutputFormat.class.getName() + "-" + System.currentTimeMillis();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    BaseKuduTest.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+
+    KuduTableOutputFormat output = new KuduTableOutputFormat();
+    Configuration conf = new Configuration();
+    conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
+    conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, TABLE_NAME);
+    output.setConf(conf);
+
+    String multitonKey = conf.get(KuduTableOutputFormat.MULTITON_KEY);
+    KuduTable table = KuduTableOutputFormat.getKuduTable(multitonKey);
+    assertNotNull(table);
+
+    Insert insert = table.newInsert();
+    PartialRow row = insert.getRow();
+    row.addInt(0, 1);
+    row.addInt(1, 2);
+    row.addInt(2, 3);
+    row.addString(3, "a string");
+    row.addBoolean(4, true);
+
+    RecordWriter<NullWritable, Operation> rw = output.getRecordWriter(null);
+    rw.write(NullWritable.get(), insert);
+    rw.close(null);
+    AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table);
+    assertEquals(1, countRowsInScan(builder.build()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
new file mode 100644
index 0000000..dff2400
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class ITOutputFormatJob extends BaseKuduTest {
+
+  private static final String TABLE_NAME =
+      ITOutputFormatJob.class.getName() + "-" + System.currentTimeMillis();
+
+  private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    BaseKuduTest.setUpBeforeClass();
+    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      BaseKuduTest.tearDownAfterClass();
+    } finally {
+      HADOOP_UTIL.cleanup();
+    }
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    String testHome =
+        HADOOP_UTIL.setupAndGetTestDir(ITOutputFormatJob.class.getName(), conf).getAbsolutePath();
+    String jobName = ITOutputFormatJob.class.getName();
+    Job job = new Job(conf, jobName);
+
+
+    // Create a 2 lines input file
+    File data = new File(testHome, "data.txt");
+    writeDataFile(data);
+    FileInputFormat.setInputPaths(job, data.toString());
+
+    // Configure the job to map the file and write to kudu, without reducers
+    Class<TestMapperTableOutput> mapperClass = TestMapperTableOutput.class;
+    job.setJarByClass(mapperClass);
+    job.setMapperClass(mapperClass);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setNumReduceTasks(0);
+    new KuduTableMapReduceUtil.TableOutputFormatConfigurator(
+        job,
+        TABLE_NAME,
+        getMasterAddresses())
+        .operationTimeoutMs(DEFAULT_SLEEP)
+        .addDependencies(false)
+        .configure();
+
+    assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+    // Make sure the data's there
+    KuduTable table = openTable(TABLE_NAME);
+    AsyncKuduScanner.AsyncKuduScannerBuilder builder =
+        client.newScannerBuilder(table);
+    assertEquals(2, countRowsInScan(builder.build()));
+  }
+
+  /**
+   * Simple Mapper that writes one row per line, the key is the line number and the STRING column
+   * is the data from that line
+   */
+  static class TestMapperTableOutput extends
+      Mapper<LongWritable, Text, NullWritable, Operation> {
+
+    private KuduTable table;
+    @Override
+    protected void map(LongWritable key, Text value, Context context) throws IOException,
+        InterruptedException {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addInt(0, (int) key.get());
+      row.addInt(1, 1);
+      row.addInt(2, 2);
+      row.addString(3, value.toString());
+      row.addBoolean(4, true);
+      context.write(NullWritable.get(), insert);
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      super.setup(context);
+      table = KuduTableMapReduceUtil.getTableFromContext(context);
+    }
+  }
+
+  private void writeDataFile(File data) throws IOException {
+    FileOutputStream fos = new FileOutputStream(data);
+    fos.write("VALUE1\nVALUE2\n".getBytes());
+    fos.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java
new file mode 100644
index 0000000..3801a0c
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.text.MessageFormat;
+import java.util.Properties;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+/**
+ * This file was forked from hbase/branches/master@4ce6f48.
+ */
+public class TestJarFinder {
+
+  @Test
+  public void testJar() throws Exception {
+
+    // Picking a class that is for sure in a JAR in the classpath
+    String jar = JarFinder.getJar(LogFactory.class);
+    Assert.assertTrue(new File(jar).exists());
+  }
+
+  private static void delete(File file) throws IOException {
+    if (file.getAbsolutePath().length() < 5) {
+      throw new IllegalArgumentException(
+        MessageFormat.format("Path [{0}] is too short, not deleting",
+          file.getAbsolutePath()));
+    }
+    if (file.exists()) {
+      if (file.isDirectory()) {
+        File[] children = file.listFiles();
+        if (children != null) {
+          for (File child : children) {
+            delete(child);
+          }
+        }
+      }
+      if (!file.delete()) {
+        throw new RuntimeException(
+          MessageFormat.format("Could not delete path [{0}]",
+            file.getAbsolutePath()));
+      }
+    }
+  }
+
+  @Test
+  public void testExpandedClasspath() throws Exception {
+    // Picking a class that is for sure in a directory in the classpath
+    // In this case, the JAR is created on the fly
+    String jar = JarFinder.getJar(TestJarFinder.class);
+    Assert.assertTrue(new File(jar).exists());
+  }
+
+  @Test
+  public void testExistingManifest() throws Exception {
+    File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+      TestJarFinder.class.getName() + "-testExistingManifest");
+    delete(dir);
+    dir.mkdirs();
+
+    File metaInfDir = new File(dir, "META-INF");
+    metaInfDir.mkdirs();
+    File manifestFile = new File(metaInfDir, "MANIFEST.MF");
+    Manifest manifest = new Manifest();
+    OutputStream os = new FileOutputStream(manifestFile);
+    manifest.write(os);
+    os.close();
+
+    File propsFile = new File(dir, "props.properties");
+    Writer writer = new FileWriter(propsFile);
+    new Properties().store(writer, "");
+    writer.close();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    JarOutputStream zos = new JarOutputStream(baos);
+    JarFinder.jarDir(dir, "", zos);
+    JarInputStream jis =
+      new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    Assert.assertNotNull(jis.getManifest());
+    jis.close();
+  }
+
+  @Test
+  public void testNoManifest() throws Exception {
+    File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+      TestJarFinder.class.getName() + "-testNoManifest");
+    delete(dir);
+    dir.mkdirs();
+    File propsFile = new File(dir, "props.properties");
+    Writer writer = new FileWriter(propsFile);
+    new Properties().store(writer, "");
+    writer.close();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    JarOutputStream zos = new JarOutputStream(baos);
+    JarFinder.jarDir(dir, "", zos);
+    JarInputStream jis =
+      new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    Assert.assertNotNull(jis.getManifest());
+    jis.close();
+  }
+}
\ No newline at end of file