You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:08 UTC
[03/36] incubator-kudu git commit: [java-client] repackage to
org.apache.kudu (Part 1)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java
deleted file mode 100644
index 57593db..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/JarFinder.java
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import com.google.common.base.Preconditions;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.text.MessageFormat;
-import java.util.Enumeration;
-import java.util.jar.JarFile;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-/**
- * Finds the Jar for a class. If the class is in a directory in the
- * classpath, it creates a Jar on the fly with the contents of the directory
- * and returns the path to that Jar. If a Jar is created, it is created in
- * the system temporary directory.
- *
- * This file was forked from hbase/branches/master@4ce6f48.
- */
-public class JarFinder {
-
- private static void copyToZipStream(File file, ZipEntry entry,
- ZipOutputStream zos) throws IOException {
- InputStream is = new FileInputStream(file);
- try {
- zos.putNextEntry(entry);
- byte[] arr = new byte[4096];
- int read = is.read(arr);
- while (read > -1) {
- zos.write(arr, 0, read);
- read = is.read(arr);
- }
- } finally {
- try {
- is.close();
- } finally {
- zos.closeEntry();
- }
- }
- }
-
- public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
- throws IOException {
- Preconditions.checkNotNull(relativePath, "relativePath");
- Preconditions.checkNotNull(zos, "zos");
-
- // by JAR spec, if there is a manifest, it must be the first entry in the
- // ZIP.
- File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
- ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
- if (!manifestFile.exists()) {
- zos.putNextEntry(manifestEntry);
- new Manifest().write(new BufferedOutputStream(zos));
- zos.closeEntry();
- } else {
- copyToZipStream(manifestFile, manifestEntry, zos);
- }
- zos.closeEntry();
- zipDir(dir, relativePath, zos, true);
- zos.close();
- }
-
- private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
- boolean start) throws IOException {
- String[] dirList = dir.list();
- for (String aDirList : dirList) {
- File f = new File(dir, aDirList);
- if (!f.isHidden()) {
- if (f.isDirectory()) {
- if (!start) {
- ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
- zos.putNextEntry(dirEntry);
- zos.closeEntry();
- }
- String filePath = f.getPath();
- File file = new File(filePath);
- zipDir(file, relativePath + f.getName() + "/", zos, false);
- }
- else {
- String path = relativePath + f.getName();
- if (!path.equals(JarFile.MANIFEST_NAME)) {
- ZipEntry anEntry = new ZipEntry(path);
- copyToZipStream(f, anEntry, zos);
- }
- }
- }
- }
- }
-
- private static void createJar(File dir, File jarFile) throws IOException {
- Preconditions.checkNotNull(dir, "dir");
- Preconditions.checkNotNull(jarFile, "jarFile");
- File jarDir = jarFile.getParentFile();
- if (!jarDir.exists()) {
- if (!jarDir.mkdirs()) {
- throw new IOException(MessageFormat.format("could not create dir [{0}]",
- jarDir));
- }
- }
- JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile));
- jarDir(dir, "", zos);
- }
-
- /**
- * Returns the full path to the Jar containing the class. It always returns a
- * JAR.
- *
- * @param klass class.
- *
- * @return path to the Jar containing the class.
- */
- public static String getJar(Class klass) {
- Preconditions.checkNotNull(klass, "klass");
- ClassLoader loader = klass.getClassLoader();
- if (loader != null) {
- String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
- try {
- for (Enumeration itr = loader.getResources(class_file);
- itr.hasMoreElements(); ) {
- URL url = (URL) itr.nextElement();
- String path = url.getPath();
- if (path.startsWith("file:")) {
- path = path.substring("file:".length());
- }
- path = URLDecoder.decode(path, "UTF-8");
- if ("jar".equals(url.getProtocol())) {
- path = URLDecoder.decode(path, "UTF-8");
- return path.replaceAll("!.*$", "");
- }
- else if ("file".equals(url.getProtocol())) {
- String klassName = klass.getName();
- klassName = klassName.replace(".", "/") + ".class";
- path = path.substring(0, path.length() - klassName.length());
- File baseDir = new File(path);
- File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
- testDir = testDir.getAbsoluteFile();
- if (!testDir.exists()) {
- testDir.mkdirs();
- }
- File tempJar = File.createTempFile("hadoop-", "", testDir);
- tempJar = new File(tempJar.getAbsolutePath() + ".jar");
- tempJar.deleteOnExit();
- createJar(baseDir, tempJar);
- return tempJar.getAbsolutePath();
- }
- }
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
deleted file mode 100644
index 25235cb..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/**
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-package org.kududb.mapreduce;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.naming.NamingException;
-
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.net.DNS;
-import org.kududb.Common;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.kududb.client.Bytes;
-import org.kududb.client.KuduClient;
-import org.kududb.client.KuduPredicate;
-import org.kududb.client.KuduScanner;
-import org.kududb.client.KuduTable;
-import org.kududb.client.LocatedTablet;
-import org.kududb.client.RowResult;
-import org.kududb.client.RowResultIterator;
-import org.kududb.client.KuduScanToken;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * This input format generates one split per tablet and the only location for each split is that
- * tablet's leader.
- * </p>
- *
- * <p>
- * Hadoop doesn't have the concept of "closing" the input format so in order to release the
- * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
- * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that
- * the object won't be used again and the AsyncKuduClient is shut down.
- * </p>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
- implements Configurable {
-
- private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
-
- /** Job parameter that specifies the input table. */
- static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
-
- /** Job parameter that specifies if the scanner should cache blocks or not (default: false). */
- static final String SCAN_CACHE_BLOCKS = "kudu.mapreduce.input.scan.cache.blocks";
-
- /** Job parameter that specifies where the masters are. */
- static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.address";
-
- /** Job parameter that specifies how long we wait for operations to complete (default: 10s). */
- static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
-
- /** Job parameter that specifies the address for the name server. */
- static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
-
- /** Job parameter that specifies the encoded column predicates (may be empty). */
- static final String ENCODED_PREDICATES_KEY =
- "kudu.mapreduce.encoded.predicates";
-
- /**
- * Job parameter that specifies the column projection as a comma-separated list of column names.
- *
- * Not specifying this at all (i.e. setting to null) or setting to the special string
- * '*' means to project all columns.
- *
- * Specifying the empty string means to project no columns (i.e just count the rows).
- */
- static final String COLUMN_PROJECTION_KEY = "kudu.mapreduce.column.projection";
-
- /**
- * The reverse DNS lookup cache mapping: address from Kudu => hostname for Hadoop. This cache is
- * used in order to not do DNS lookups multiple times for each tablet server.
- */
- private final Map<String, String> reverseDNSCacheMap = new HashMap<>();
-
- private Configuration conf;
- private KuduClient client;
- private KuduTable table;
- private long operationTimeoutMs;
- private String nameServer;
- private boolean cacheBlocks;
- private List<String> projectedCols;
- private List<KuduPredicate> predicates;
-
- @Override
- public List<InputSplit> getSplits(JobContext jobContext)
- throws IOException, InterruptedException {
- try {
- if (table == null) {
- throw new IOException("No table was provided");
- }
-
- KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
- .setProjectedColumnNames(projectedCols)
- .cacheBlocks(cacheBlocks)
- .setTimeout(operationTimeoutMs);
- for (KuduPredicate predicate : predicates) {
- tokenBuilder.addPredicate(predicate);
- }
- List<KuduScanToken> tokens = tokenBuilder.build();
-
- List<InputSplit> splits = new ArrayList<>(tokens.size());
- for (KuduScanToken token : tokens) {
- List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
- for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
- locations.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
- }
- splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
- }
- return splits;
- } finally {
- shutdownClient();
- }
- }
-
- private void shutdownClient() throws IOException {
- try {
- client.shutdown();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * This method might seem alien, but we do this in order to resolve the hostnames the same way
- * Hadoop does. This ensures we get locality if Kudu is running along MR/YARN.
- * @param host hostname we got from the master
- * @param port port we got from the master
- * @return reverse DNS'd address
- */
- private String reverseDNS(String host, Integer port) {
- String location = this.reverseDNSCacheMap.get(host);
- if (location != null) {
- return location;
- }
- // The below InetSocketAddress creation does a name resolution.
- InetSocketAddress isa = new InetSocketAddress(host, port);
- if (isa.isUnresolved()) {
- LOG.warn("Failed address resolve for: " + isa);
- }
- InetAddress tabletInetAddress = isa.getAddress();
- try {
- location = domainNamePointerToHostName(
- DNS.reverseDns(tabletInetAddress, this.nameServer));
- this.reverseDNSCacheMap.put(host, location);
- } catch (NamingException e) {
- LOG.warn("Cannot resolve the host name for " + tabletInetAddress + " because of " + e);
- location = host;
- }
- return location;
- }
-
- @Override
- public RecordReader<NullWritable, RowResult> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- return new TableRecordReader();
- }
-
- @Override
- public void setConf(Configuration entries) {
- this.conf = new Configuration(entries);
-
- String tableName = conf.get(INPUT_TABLE_KEY);
- String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
- this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
- AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
- this.nameServer = conf.get(NAME_SERVER_KEY);
- this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
-
- this.client = new KuduClient.KuduClientBuilder(masterAddresses)
- .defaultOperationTimeoutMs(operationTimeoutMs)
- .build();
- try {
- this.table = client.openTable(tableName);
- } catch (Exception ex) {
- throw new RuntimeException("Could not obtain the table from the master, " +
- "is the master running and is this table created? tablename=" + tableName + " and " +
- "master address= " + masterAddresses, ex);
- }
-
- String projectionConfig = conf.get(COLUMN_PROJECTION_KEY);
- if (projectionConfig == null || projectionConfig.equals("*")) {
- this.projectedCols = null; // project the whole table
- } else if ("".equals(projectionConfig)) {
- this.projectedCols = new ArrayList<>();
- } else {
- this.projectedCols = Lists.newArrayList(Splitter.on(',').split(projectionConfig));
-
- // Verify that the column names are valid -- better to fail with an exception
- // before we submit the job.
- Schema tableSchema = table.getSchema();
- for (String columnName : projectedCols) {
- if (tableSchema.getColumn(columnName) == null) {
- throw new IllegalArgumentException("Unknown column " + columnName);
- }
- }
- }
-
- this.predicates = new ArrayList<>();
- try {
- InputStream is =
- new ByteArrayInputStream(Base64.decodeBase64(conf.get(ENCODED_PREDICATES_KEY, "")));
- while (is.available() > 0) {
- this.predicates.add(KuduPredicate.fromPB(table.getSchema(),
- Common.ColumnPredicatePB.parseDelimitedFrom(is)));
- }
- } catch (IOException e) {
- throw new RuntimeException("unable to deserialize predicates from the configuration", e);
- }
- }
-
- /**
- * Given a PTR string generated via reverse DNS lookup, return everything
- * except the trailing period. Example for host.example.com., return
- * host.example.com
- * @param dnPtr a domain name pointer (PTR) string.
- * @return Sanitized hostname with last period stripped off.
- *
- */
- private static String domainNamePointerToHostName(String dnPtr) {
- if (dnPtr == null)
- return null;
- return dnPtr.endsWith(".") ? dnPtr.substring(0, dnPtr.length() - 1) : dnPtr;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
-
- /** The scan token that the split will use to scan the Kudu table. */
- private byte[] scanToken;
-
- /** The start partition key of the scan. Used for sorting splits. */
- private byte[] partitionKey;
-
- /** Tablet server locations which host the tablet to be scanned. */
- private String[] locations;
-
- public TableSplit() { } // Writable
-
- public TableSplit(KuduScanToken token, String[] locations) throws IOException {
- this.scanToken = token.serialize();
- this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
- this.locations = locations;
- }
-
- public byte[] getScanToken() {
- return scanToken;
- }
-
- public byte[] getPartitionKey() {
- return partitionKey;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- // TODO Guesstimate a size
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return locations;
- }
-
- @Override
- public int compareTo(TableSplit other) {
- return UnsignedBytes.lexicographicalComparator().compare(partitionKey, other.partitionKey);
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- Bytes.writeByteArray(dataOutput, scanToken);
- Bytes.writeByteArray(dataOutput, partitionKey);
- dataOutput.writeInt(locations.length);
- for (String location : locations) {
- byte[] str = Bytes.fromString(location);
- Bytes.writeByteArray(dataOutput, str);
- }
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- scanToken = Bytes.readByteArray(dataInput);
- partitionKey = Bytes.readByteArray(dataInput);
- locations = new String[dataInput.readInt()];
- for (int i = 0; i < locations.length; i++) {
- byte[] str = Bytes.readByteArray(dataInput);
- locations[i] = Bytes.getString(str);
- }
- }
-
- @Override
- public int hashCode() {
- // We currently just care about the partition key since we're within the same table.
- return Arrays.hashCode(partitionKey);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TableSplit that = (TableSplit) o;
-
- return this.compareTo(that) == 0;
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("partitionKey", Bytes.pretty(partitionKey))
- .add("locations", Arrays.toString(locations))
- .toString();
- }
- }
-
- class TableRecordReader extends RecordReader<NullWritable, RowResult> {
-
- private final NullWritable currentKey = NullWritable.get();
- private RowResult currentValue;
- private RowResultIterator iterator;
- private KuduScanner scanner;
- private TableSplit split;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- if (!(inputSplit instanceof TableSplit)) {
- throw new IllegalArgumentException("TableSplit is the only accepted input split");
- }
-
- split = (TableSplit) inputSplit;
-
- try {
- scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- // Calling this now to set iterator.
- tryRefreshIterator();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!iterator.hasNext()) {
- tryRefreshIterator();
- if (!iterator.hasNext()) {
- // Means we still have the same iterator, we're done
- return false;
- }
- }
- currentValue = iterator.next();
- return true;
- }
-
- /**
- * If the scanner has more rows, get a new iterator else don't do anything.
- * @throws IOException
- */
- private void tryRefreshIterator() throws IOException {
- if (!scanner.hasMoreRows()) {
- return;
- }
- try {
- iterator = scanner.nextRows();
- } catch (Exception e) {
- throw new IOException("Couldn't get scan data", e);
- }
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return currentKey;
- }
-
- @Override
- public RowResult getCurrentValue() throws IOException, InterruptedException {
- return currentValue;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- // TODO Guesstimate progress
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- try {
- scanner.close();
- } catch (Exception e) {
- throw new IOException(e);
- }
- shutdownClient();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
deleted file mode 100644
index 0b919d9..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-package org.kududb.mapreduce;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.StringUtils;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.kududb.client.ColumnRangePredicate;
-import org.kududb.client.KuduPredicate;
-import org.kududb.client.KuduTable;
-import org.kududb.client.Operation;
-
-/**
- * Utility class to setup MR jobs that use Kudu as an input and/or output.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableMapReduceUtil {
- // Mostly lifted from HBase's TableMapReduceUtil
-
- private static final Log LOG = LogFactory.getLog(KuduTableMapReduceUtil.class);
-
- /**
- * Doesn't need instantiation
- */
- private KuduTableMapReduceUtil() { }
-
-
- /**
- * Base class for MR I/O formats, contains the common configurations.
- */
- private static abstract class AbstractMapReduceConfigurator<S> {
- protected final Job job;
- protected final String table;
-
- protected boolean addDependencies = true;
-
- /**
- * Constructor for the required fields to configure.
- * @param job a job to configure
- * @param table a string that contains the name of the table to read from
- */
- private AbstractMapReduceConfigurator(Job job, String table) {
- this.job = job;
- this.table = table;
- }
-
- /**
- * Sets whether this job should add Kudu's dependencies to the distributed cache. Turned on
- * by default.
- * @param addDependencies a boolean that says if we should add the dependencies
- * @return this instance
- */
- @SuppressWarnings("unchecked")
- public S addDependencies(boolean addDependencies) {
- this.addDependencies = addDependencies;
- return (S) this;
- }
-
- /**
- * Configures the job using the passed parameters.
- * @throws IOException If addDependencies is enabled and a problem is encountered reading
- * files on the filesystem
- */
- public abstract void configure() throws IOException;
- }
-
- /**
- * Builder-like class that sets up the required configurations and classes to write to Kudu.
- * <p>
- * Use either child classes when configuring the table output format.
- */
- private static abstract class AbstractTableOutputFormatConfigurator
- <S extends AbstractTableOutputFormatConfigurator<? super S>>
- extends AbstractMapReduceConfigurator<S> {
-
- protected String masterAddresses;
- protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-
- /**
- * {@inheritDoc}
- */
- private AbstractTableOutputFormatConfigurator(Job job, String table) {
- super(job, table);
- }
-
- /**
- * {@inheritDoc}
- */
- public void configure() throws IOException {
- job.setOutputFormatClass(KuduTableOutputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Operation.class);
-
- Configuration conf = job.getConfiguration();
- conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
- conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, table);
- conf.setLong(KuduTableOutputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
- if (addDependencies) {
- addDependencyJars(job);
- }
- }
- }
-
- /**
- * Builder-like class that sets up the required configurations and classes to read from Kudu.
- * By default, block caching is disabled.
- * <p>
- * Use either child classes when configuring the table input format.
- */
- private static abstract class AbstractTableInputFormatConfigurator
- <S extends AbstractTableInputFormatConfigurator<? super S>>
- extends AbstractMapReduceConfigurator<S> {
-
- protected String masterAddresses;
- protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
- protected final String columnProjection;
- protected boolean cacheBlocks;
- protected List<KuduPredicate> predicates = new ArrayList<>();
-
- /**
- * Constructor for the required fields to configure.
- * @param job a job to configure
- * @param table a string that contains the name of the table to read from
- * @param columnProjection a string containing a comma-separated list of columns to read.
- * It can be null in which case we read empty rows
- */
- private AbstractTableInputFormatConfigurator(Job job, String table, String columnProjection) {
- super(job, table);
- this.columnProjection = columnProjection;
- }
-
- /**
- * Sets the block caching configuration for the scanners. Turned off by default.
- * @param cacheBlocks whether the job should use scanners that cache blocks.
- * @return this instance
- */
- public S cacheBlocks(boolean cacheBlocks) {
- this.cacheBlocks = cacheBlocks;
- return (S) this;
- }
-
- /**
- * Configures the job with all the passed parameters.
- * @throws IOException If addDependencies is enabled and a problem is encountered reading
- * files on the filesystem
- */
- public void configure() throws IOException {
- job.setInputFormatClass(KuduTableInputFormat.class);
-
- Configuration conf = job.getConfiguration();
-
- conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, masterAddresses);
- conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, table);
- conf.setLong(KuduTableInputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
- conf.setBoolean(KuduTableInputFormat.SCAN_CACHE_BLOCKS, cacheBlocks);
-
- if (columnProjection != null) {
- conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
- }
-
- conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, base64EncodePredicates(predicates));
-
- if (addDependencies) {
- addDependencyJars(job);
- }
- }
- }
-
- /**
- * Returns the provided predicates as a Base64 encoded string.
- * @param predicates the predicates to encode
- * @return the encoded predicates
- */
- static String base64EncodePredicates(List<KuduPredicate> predicates) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- for (KuduPredicate predicate : predicates) {
- predicate.toPB().writeDelimitedTo(baos);
- }
- return Base64.encodeBase64String(baos.toByteArray());
- }
-
-
- /**
- * Table output format configurator to use to specify the parameters directly.
- */
- public static class TableOutputFormatConfigurator
- extends AbstractTableOutputFormatConfigurator<TableOutputFormatConfigurator> {
-
- /**
- * Constructor for the required fields to configure.
- * @param job a job to configure
- * @param table a string that contains the name of the table to read from
- * @param masterAddresses a comma-separated list of masters' hosts and ports
- */
- public TableOutputFormatConfigurator(Job job, String table, String masterAddresses) {
- super(job, table);
- this.masterAddresses = masterAddresses;
- }
-
- /**
- * Sets the timeout for all the operations. The default is 10 seconds.
- * @param operationTimeoutMs a long that represents the timeout for operations to complete,
- * must be a positive value or 0
- * @return this instance
- * @throws IllegalArgumentException if the operation timeout is lower than 0
- */
- public TableOutputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
- if (operationTimeoutMs < 0) {
- throw new IllegalArgumentException("The operation timeout must be => 0, " +
- "passed value is: " + operationTimeoutMs);
- }
- this.operationTimeoutMs = operationTimeoutMs;
- return this;
- }
- }
-
- /**
- * Table output format that uses a {@link CommandLineParser} in order to set the
- * master config and the operation timeout.
- */
- public static class TableOutputFormatConfiguratorWithCommandLineParser extends
- AbstractTableOutputFormatConfigurator<TableOutputFormatConfiguratorWithCommandLineParser> {
-
- /**
- * {@inheritDoc}
- */
- public TableOutputFormatConfiguratorWithCommandLineParser(Job job, String table) {
- super(job, table);
- CommandLineParser parser = new CommandLineParser(job.getConfiguration());
- this.masterAddresses = parser.getMasterAddresses();
- this.operationTimeoutMs = parser.getOperationTimeoutMs();
- }
- }
-
- /**
- * Table input format configurator to use to specify the parameters directly.
- */
- public static class TableInputFormatConfigurator
- extends AbstractTableInputFormatConfigurator<TableInputFormatConfigurator> {
-
- /**
- * Constructor for the required fields to configure.
- * @param job a job to configure
- * @param table a string that contains the name of the table to read from
- * @param columnProjection a string containing a comma-separated list of columns to read.
- * It can be null in which case we read empty rows
- * @param masterAddresses a comma-separated list of masters' hosts and ports
- */
- public TableInputFormatConfigurator(Job job, String table, String columnProjection,
- String masterAddresses) {
- super(job, table, columnProjection);
- this.masterAddresses = masterAddresses;
- }
-
- /**
- * Sets the timeout for all the operations. The default is 10 seconds.
- * @param operationTimeoutMs a long that represents the timeout for operations to complete,
- * must be a positive value or 0
- * @return this instance
- * @throws IllegalArgumentException if the operation timeout is lower than 0
- */
- public TableInputFormatConfigurator operationTimeoutMs(long operationTimeoutMs) {
- if (operationTimeoutMs < 0) {
- throw new IllegalArgumentException("The operation timeout must be => 0, " +
- "passed value is: " + operationTimeoutMs);
- }
- this.operationTimeoutMs = operationTimeoutMs;
- return this;
- }
-
- /**
- * Adds a new predicate that will be pushed down to all the tablets.
- * @param predicate a predicate to add
- * @return this instance
- * @deprecated use {@link #addPredicate}
- */
- @Deprecated
- public TableInputFormatConfigurator addColumnRangePredicate(ColumnRangePredicate predicate) {
- return addPredicate(predicate.toKuduPredicate());
- }
-
- /**
- * Adds a new predicate that will be pushed down to all the tablets.
- * @param predicate a predicate to add
- * @return this instance
- */
- public TableInputFormatConfigurator addPredicate(KuduPredicate predicate) {
- this.predicates.add(predicate);
- return this;
- }
- }
-
- /**
- * Table input format that uses a {@link CommandLineParser} in order to set the
- * master config and the operation timeout.
- * This version cannot set column range predicates.
- */
- public static class TableInputFormatConfiguratorWithCommandLineParser extends
- AbstractTableInputFormatConfigurator<TableInputFormatConfiguratorWithCommandLineParser> {
-
- /**
- * {@inheritDoc}
- */
- public TableInputFormatConfiguratorWithCommandLineParser(Job job,
- String table,
- String columnProjection) {
- super(job, table, columnProjection);
- CommandLineParser parser = new CommandLineParser(job.getConfiguration());
- this.masterAddresses = parser.getMasterAddresses();
- this.operationTimeoutMs = parser.getOperationTimeoutMs();
- }
- }
-
- /**
- * Use this method when setting up a task to get access to the KuduTable in order to create
- * Inserts, Updates, and Deletes.
- * @param context Map context
- * @return The kudu table object as setup by the output format
- */
- @SuppressWarnings("rawtypes")
- public static KuduTable getTableFromContext(TaskInputOutputContext context) {
- String multitonKey = context.getConfiguration().get(KuduTableOutputFormat.MULTITON_KEY);
- return KuduTableOutputFormat.getKuduTable(multitonKey);
- }
-
- /**
- * Add the Kudu dependency jars as well as jars for any of the configured
- * job classes to the job configuration, so that JobClient will ship them
- * to the cluster and add them to the DistributedCache.
- */
- public static void addDependencyJars(Job job) throws IOException {
- addKuduDependencyJars(job.getConfiguration());
- try {
- addDependencyJars(job.getConfiguration(),
- // when making changes here, consider also mapred.TableMapReduceUtil
- // pull job classes
- job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(),
- job.getInputFormatClass(),
- job.getOutputKeyClass(),
- job.getOutputValueClass(),
- job.getOutputFormatClass(),
- job.getPartitionerClass(),
- job.getCombinerClass());
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * Add the jars containing the given classes to the job's configuration
- * such that JobClient will ship them to the cluster and add them to
- * the DistributedCache.
- */
- public static void addDependencyJars(Configuration conf,
- Class<?>... classes) throws IOException {
-
- FileSystem localFs = FileSystem.getLocal(conf);
- Set<String> jars = new HashSet<String>();
- // Add jars that are already in the tmpjars variable
- jars.addAll(conf.getStringCollection("tmpjars"));
-
- // add jars as we find them to a map of contents jar name so that we can avoid
- // creating new jars for classes that have already been packaged.
- Map<String, String> packagedClasses = new HashMap<String, String>();
-
- // Add jars containing the specified classes
- for (Class<?> clazz : classes) {
- if (clazz == null) continue;
-
- Path path = findOrCreateJar(clazz, localFs, packagedClasses);
- if (path == null) {
- LOG.warn("Could not find jar for class " + clazz +
- " in order to ship it to the cluster.");
- continue;
- }
- if (!localFs.exists(path)) {
- LOG.warn("Could not validate jar file " + path + " for class "
- + clazz);
- continue;
- }
- jars.add(path.toString());
- }
- if (jars.isEmpty()) return;
-
- conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
- }
-
- /**
- * Add Kudu and its dependencies (only) to the job configuration.
- * <p>
- * This is intended as a low-level API, facilitating code reuse between this
- * class and its mapred counterpart. It also of use to external tools that
- * need to build a MapReduce job that interacts with Kudu but want
- * fine-grained control over the jars shipped to the cluster.
- * </p>
- * @param conf The Configuration object to extend with dependencies.
- * @see KuduTableMapReduceUtil
- * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
- */
- public static void addKuduDependencyJars(Configuration conf) throws IOException {
- addDependencyJars(conf,
- // explicitly pull a class from each module
- Operation.class, // kudu-client
- KuduTableMapReduceUtil.class, // kudu-mapreduce
- // pull necessary dependencies
- com.stumbleupon.async.Deferred.class);
- }
-
- /**
- * Finds the Jar for a class or creates it if it doesn't exist. If the class
- * is in a directory in the classpath, it creates a Jar on the fly with the
- * contents of the directory and returns the path to that Jar. If a Jar is
- * created, it is created in the system temporary directory. Otherwise,
- * returns an existing jar that contains a class of the same name. Maintains
- * a mapping from jar contents to the tmp jar created.
- * @param my_class the class to find.
- * @param fs the FileSystem with which to qualify the returned path.
- * @param packagedClasses a map of class name to path.
- * @return a jar file that contains the class.
- * @throws IOException
- */
- @SuppressWarnings("deprecation")
- private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
- Map<String, String> packagedClasses)
- throws IOException {
- // attempt to locate an existing jar for the class.
- String jar = findContainingJar(my_class, packagedClasses);
- if (null == jar || jar.isEmpty()) {
- jar = JarFinder.getJar(my_class);
- updateMap(jar, packagedClasses);
- }
-
- if (null == jar || jar.isEmpty()) {
- return null;
- }
-
- LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
- return new Path(jar).makeQualified(fs);
- }
-
- /**
- * Find a jar that contains a class of the same name, if any. It will return
- * a jar file, even if that is not the first thing on the class path that
- * has a class with the same name. Looks first on the classpath and then in
- * the <code>packagedClasses</code> map.
- * @param my_class the class to find.
- * @return a jar file that contains the class, or null.
- * @throws IOException
- */
- private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
- throws IOException {
- ClassLoader loader = my_class.getClassLoader();
- String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-
- // first search the classpath
- for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
- URL url = itr.nextElement();
- if ("jar".equals(url.getProtocol())) {
- String toReturn = url.getPath();
- if (toReturn.startsWith("file:")) {
- toReturn = toReturn.substring("file:".length());
- }
- // URLDecoder is a misnamed class, since it actually decodes
- // x-www-form-urlencoded MIME type rather than actual
- // URL encoding (which the file path has). Therefore it would
- // decode +s to ' 's which is incorrect (spaces are actually
- // either unencoded or encoded as "%20"). Replace +s first, so
- // that they are kept sacred during the decoding process.
- toReturn = toReturn.replaceAll("\\+", "%2B");
- toReturn = URLDecoder.decode(toReturn, "UTF-8");
- return toReturn.replaceAll("!.*$", "");
- }
- }
-
- // now look in any jars we've packaged using JarFinder. Returns null when
- // no jar is found.
- return packagedClasses.get(class_file);
- }
-
- /**
- * Add entries to <code>packagedClasses</code> corresponding to class files
- * contained in <code>jar</code>.
- * @param jar The jar who's content to list.
- * @param packagedClasses map[class -> jar]
- */
- private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
- if (null == jar || jar.isEmpty()) {
- return;
- }
- ZipFile zip = null;
- try {
- zip = new ZipFile(jar);
- for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
- ZipEntry entry = iter.nextElement();
- if (entry.getName().endsWith("class")) {
- packagedClasses.put(entry.getName(), jar);
- }
- }
- } finally {
- if (null != zip) zip.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java
deleted file mode 100644
index 8af750b..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputCommitter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-
-import java.io.IOException;
-
-/**
- * Small committer class that does not do anything.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableOutputCommitter extends OutputCommitter {
- @Override
- public void setupJob(JobContext jobContext) throws IOException {
-
- }
-
- @Override
- public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
- return false;
- }
-
- @Override
- public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
- }
-
- @Override
- public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java
deleted file mode 100644
index e80b73f..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableOutputFormat.java
+++ /dev/null
@@ -1,215 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.*;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <p>
- * Use {@link
- * KuduTableMapReduceUtil.TableOutputFormatConfigurator}
- * to correctly setup this output format, then {@link
- * KuduTableMapReduceUtil#getTableFromContext(org.apache.hadoop.mapreduce.TaskInputOutputContext)}
- * to get a KuduTable.
- * </p>
- *
- * <p>
- * Hadoop doesn't have the concept of "closing" the output format so in order to release the
- * resources we assume that once either
- * {@link #checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)}
- * or {@link TableRecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)}
- * have been called that the object won't be used again and the KuduClient is shut down.
- * </p>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduTableOutputFormat extends OutputFormat<NullWritable,Operation>
- implements Configurable {
-
- private static final Logger LOG = LoggerFactory.getLogger(KuduTableOutputFormat.class);
-
- /** Job parameter that specifies the output table. */
- static final String OUTPUT_TABLE_KEY = "kudu.mapreduce.output.table";
-
- /** Job parameter that specifies where the masters are */
- static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.addresses";
-
- /** Job parameter that specifies how long we wait for operations to complete */
- static final String OPERATION_TIMEOUT_MS_KEY = "kudu.mapreduce.operation.timeout.ms";
-
- /** Number of rows that are buffered before flushing to the tablet server */
- static final String BUFFER_ROW_COUNT_KEY = "kudu.mapreduce.buffer.row.count";
-
- /**
- * Job parameter that specifies which key is to be used to reach the KuduTableOutputFormat
- * belonging to the caller
- */
- static final String MULTITON_KEY = "kudu.mapreduce.multitonkey";
-
- /**
- * This multiton is used so that the tasks using this output format/record writer can find
- * their KuduTable without having a direct dependency on this class,
- * with the additional complexity that the output format cannot be shared between threads.
- */
- private static final ConcurrentHashMap<String, KuduTableOutputFormat> MULTITON = new
- ConcurrentHashMap<String, KuduTableOutputFormat>();
-
- /**
- * This counter helps indicate which task log to look at since rows that weren't applied will
- * increment this counter.
- */
- public enum Counters { ROWS_WITH_ERRORS }
-
- private Configuration conf = null;
-
- private KuduClient client;
- private KuduTable table;
- private KuduSession session;
- private long operationTimeoutMs;
-
- @Override
- public void setConf(Configuration entries) {
- this.conf = new Configuration(entries);
-
- String masterAddress = this.conf.get(MASTER_ADDRESSES_KEY);
- String tableName = this.conf.get(OUTPUT_TABLE_KEY);
- this.operationTimeoutMs = this.conf.getLong(OPERATION_TIMEOUT_MS_KEY,
- AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
- int bufferSpace = this.conf.getInt(BUFFER_ROW_COUNT_KEY, 1000);
-
- this.client = new KuduClient.KuduClientBuilder(masterAddress)
- .defaultOperationTimeoutMs(operationTimeoutMs)
- .build();
- try {
- this.table = client.openTable(tableName);
- } catch (Exception ex) {
- throw new RuntimeException("Could not obtain the table from the master, " +
- "is the master running and is this table created? tablename=" + tableName + " and " +
- "master address= " + masterAddress, ex);
- }
- this.session = client.newSession();
- this.session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
- this.session.setMutationBufferSpace(bufferSpace);
- this.session.setIgnoreAllDuplicateRows(true);
- String multitonKey = String.valueOf(Thread.currentThread().getId());
- assert(MULTITON.get(multitonKey) == null);
- MULTITON.put(multitonKey, this);
- entries.set(MULTITON_KEY, multitonKey);
- }
-
- private void shutdownClient() throws IOException {
- try {
- client.shutdown();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- public static KuduTable getKuduTable(String multitonKey) {
- return MULTITON.get(multitonKey).getKuduTable();
- }
-
- private KuduTable getKuduTable() {
- return this.table;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public RecordWriter<NullWritable, Operation> getRecordWriter(TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- return new TableRecordWriter(this.session);
- }
-
- @Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
- shutdownClient();
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws
- IOException, InterruptedException {
- return new KuduTableOutputCommitter();
- }
-
- protected class TableRecordWriter extends RecordWriter<NullWritable, Operation> {
-
- private final AtomicLong rowsWithErrors = new AtomicLong();
- private final KuduSession session;
-
- public TableRecordWriter(KuduSession session) {
- this.session = session;
- }
-
- @Override
- public void write(NullWritable key, Operation operation)
- throws IOException, InterruptedException {
- try {
- session.apply(operation);
- } catch (Exception e) {
- throw new IOException("Encountered an error while writing", e);
- }
- }
-
- @Override
- public void close(TaskAttemptContext taskAttemptContext) throws IOException,
- InterruptedException {
- try {
- processRowErrors(session.close());
- shutdownClient();
- } catch (Exception e) {
- throw new IOException("Encountered an error while closing this task", e);
- } finally {
- if (taskAttemptContext != null) {
- // This is the only place where we have access to the context in the record writer,
- // so set the counter here.
- taskAttemptContext.getCounter(Counters.ROWS_WITH_ERRORS).setValue(rowsWithErrors.get());
- }
- }
- }
-
- private void processRowErrors(List<OperationResponse> responses) {
- List<RowError> errors = OperationResponse.collectErrors(responses);
- if (!errors.isEmpty()) {
- int rowErrorsCount = errors.size();
- rowsWithErrors.addAndGet(rowErrorsCount);
- LOG.warn("Got per errors for " + rowErrorsCount + " rows, " +
- "the first one being " + errors.get(0).getStatus());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java
deleted file mode 100644
index 7cf3ada..0000000
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/TableReducer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.Operation;
-import org.apache.hadoop.mapreduce.Reducer;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
- extends Reducer<KEYIN, VALUEIN, KEYOUT, Operation> {
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java
new file mode 100644
index 0000000..1e2cb41
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/HadoopTestingUtility.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.mapreduce;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * This class is analog to HBaseTestingUtility except that we only need it for the MR tests.
+ */
+public class HadoopTestingUtility {
+
+ private static final Log LOG = LogFactory.getLog(HadoopTestingUtility.class);
+
+ private File testDir;
+
+ private Configuration conf = new Configuration();
+
+ /**
+ * System property key to get base test directory value
+ */
+ public static final String BASE_TEST_DIRECTORY_KEY =
+ "test.build.data.basedirectory";
+
+ /**
+ * Default base directory for test output.
+ */
+ private static final String DEFAULT_BASE_TEST_DIRECTORY = "target/mr-data";
+
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ /**
+ * Sets up a temporary directory for the test to run in. Call cleanup() at the end of your
+ * tests to remove it.
+ * @param testName Will be used to build a part of the directory name for the test
+ * @return Where the test is homed
+ */
+ public File setupAndGetTestDir(String testName, Configuration conf) {
+ if (this.testDir != null) {
+ return this.testDir;
+ }
+ Path testPath = new Path(getBaseTestDir(), testName + System.currentTimeMillis());
+ this.testDir = new File(testPath.toString()).getAbsoluteFile();
+ this.testDir.mkdirs();
+ // Set this property so when mapreduce jobs run, they will use this as their home dir.
+ System.setProperty("test.build.dir", this.testDir.toString());
+ System.setProperty("hadoop.home.dir", this.testDir.toString());
+ conf.set("hadoop.tmp.dir", this.testDir.toString() + "/mapred");
+
+ LOG.info("Test configured to write to " + this.testDir);
+ return this.testDir;
+ }
+
+ private Path getBaseTestDir() {
+ String pathName = System.getProperty(BASE_TEST_DIRECTORY_KEY, DEFAULT_BASE_TEST_DIRECTORY);
+ return new Path(pathName);
+ }
+
+ public void cleanup() throws IOException {
+ FileSystem.closeAll();
+ if (this.testDir != null) {
+ delete(this.testDir);
+ }
+ }
+
+ private void delete(File dir) throws IOException {
+ if (dir == null || !dir.exists()) {
+ return;
+ }
+ try {
+ FileUtils.deleteDirectory(dir);
+ } catch (IOException ex) {
+ LOG.warn("Failed to delete " + dir.getAbsolutePath());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
new file mode 100644
index 0000000..3d04043
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class ITInputFormatJob extends BaseKuduTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ITInputFormatJob.class);
+
+ private static final String TABLE_NAME =
+ ITInputFormatJob.class.getName() + "-" + System.currentTimeMillis();
+
+ private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+ /** Counter enumeration to count the actual rows. */
+ private static enum Counters { ROWS }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ BaseKuduTest.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ BaseKuduTest.tearDownAfterClass();
+ } finally {
+ HADOOP_UTIL.cleanup();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void test() throws Exception {
+
+ createFourTabletsTableWithNineRows(TABLE_NAME);
+
+ Configuration conf = new Configuration();
+ HADOOP_UTIL.setupAndGetTestDir(ITInputFormatJob.class.getName(), conf).getAbsolutePath();
+
+ createAndTestJob(conf, new ArrayList<KuduPredicate>(), 9);
+
+ KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+ basicSchema.getColumnByIndex(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 20);
+ createAndTestJob(conf, Lists.newArrayList(pred1), 6);
+
+ KuduPredicate pred2 = KuduPredicate.newComparisonPredicate(
+ basicSchema.getColumnByIndex(2), KuduPredicate.ComparisonOp.LESS_EQUAL, 1);
+ createAndTestJob(conf, Lists.newArrayList(pred1, pred2), 2);
+ }
+
+ private void createAndTestJob(Configuration conf,
+ List<KuduPredicate> predicates, int expectedCount)
+ throws Exception {
+ String jobName = ITInputFormatJob.class.getName();
+ Job job = new Job(conf, jobName);
+
+ Class<TestMapperTableInput> mapperClass = TestMapperTableInput.class;
+ job.setJarByClass(mapperClass);
+ job.setMapperClass(mapperClass);
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ KuduTableMapReduceUtil.TableInputFormatConfigurator configurator =
+ new KuduTableMapReduceUtil.TableInputFormatConfigurator(
+ job,
+ TABLE_NAME,
+ "*",
+ getMasterAddresses())
+ .operationTimeoutMs(DEFAULT_SLEEP)
+ .addDependencies(false)
+ .cacheBlocks(false);
+ for (KuduPredicate predicate : predicates) {
+ configurator.addPredicate(predicate);
+ }
+ configurator.configure();
+
+ assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+ assertEquals(expectedCount, job.getCounters().findCounter(Counters.ROWS).getValue());
+ }
+
+ /**
+ * Simple row counter and printer
+ */
+ static class TestMapperTableInput extends
+ Mapper<NullWritable, RowResult, NullWritable, NullWritable> {
+
+ @Override
+ protected void map(NullWritable key, RowResult value, Context context) throws IOException,
+ InterruptedException {
+ context.getCounter(Counters.ROWS).increment(1);
+ LOG.info(value.toStringLongFormat()); // useful to visual debugging
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
new file mode 100644
index 0000000..ff4d81a
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.kududb.Schema;
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class ITKuduTableInputFormat extends BaseKuduTest {
+
+ private static final String TABLE_NAME =
+ ITKuduTableInputFormat.class.getName() + "-" + System.currentTimeMillis();
+
+ @Test
+ public void test() throws Exception {
+ createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+
+ KuduTable table = openTable(TABLE_NAME);
+ Schema schema = getBasicSchema();
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt(0, 1);
+ row.addInt(1, 2);
+ row.addInt(2, 3);
+ row.addString(3, "a string");
+ row.addBoolean(4, true);
+ AsyncKuduSession session = client.newSession();
+ session.apply(insert).join(DEFAULT_SLEEP);
+ session.close().join(DEFAULT_SLEEP);
+
+ // Test getting all the columns back
+ RecordReader<NullWritable, RowResult> reader = createRecordReader("*", null);
+ assertTrue(reader.nextKeyValue());
+ assertEquals(5, reader.getCurrentValue().getColumnProjection().getColumnCount());
+ assertFalse(reader.nextKeyValue());
+
+ // Test getting two columns back
+ reader = createRecordReader(schema.getColumnByIndex(3).getName() + "," +
+ schema.getColumnByIndex(2).getName(), null);
+ assertTrue(reader.nextKeyValue());
+ assertEquals(2, reader.getCurrentValue().getColumnProjection().getColumnCount());
+ assertEquals("a string", reader.getCurrentValue().getString(0));
+ assertEquals(3, reader.getCurrentValue().getInt(1));
+ try {
+ reader.getCurrentValue().getString(2);
+ fail("Should only be getting 2 columns back");
+ } catch (IndexOutOfBoundsException e) {
+ // expected
+ }
+
+ // Test getting one column back
+ reader = createRecordReader(schema.getColumnByIndex(1).getName(), null);
+ assertTrue(reader.nextKeyValue());
+ assertEquals(1, reader.getCurrentValue().getColumnProjection().getColumnCount());
+ assertEquals(2, reader.getCurrentValue().getInt(0));
+ try {
+ reader.getCurrentValue().getString(1);
+ fail("Should only be getting 1 column back");
+ } catch (IndexOutOfBoundsException e) {
+ // expected
+ }
+
+ // Test getting empty rows back
+ reader = createRecordReader("", null);
+ assertTrue(reader.nextKeyValue());
+ assertEquals(0, reader.getCurrentValue().getColumnProjection().getColumnCount());
+ assertFalse(reader.nextKeyValue());
+
+ // Test getting an unknown table, will not work
+ try {
+ createRecordReader("unknown", null);
+ fail("Should not be able to scan a column that doesn't exist");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // Test using a predicate that filters the row out.
+ KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+ schema.getColumnByIndex(1), KuduPredicate.ComparisonOp.GREATER_EQUAL, 3);
+ reader = createRecordReader("*", Lists.newArrayList(pred1));
+ assertFalse(reader.nextKeyValue());
+ }
+
+ private RecordReader<NullWritable, RowResult> createRecordReader(String columnProjection,
+ List<KuduPredicate> predicates) throws IOException, InterruptedException {
+ KuduTableInputFormat input = new KuduTableInputFormat();
+ Configuration conf = new Configuration();
+ conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
+ conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, TABLE_NAME);
+ if (columnProjection != null) {
+ conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
+ }
+ if (predicates != null) {
+ String encodedPredicates = KuduTableMapReduceUtil.base64EncodePredicates(predicates);
+ conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, encodedPredicates);
+ }
+ input.setConf(conf);
+ List<InputSplit> splits = input.getSplits(null);
+
+ // We need to re-create the input format to reconnect the client.
+ input = new KuduTableInputFormat();
+ input.setConf(conf);
+ RecordReader<NullWritable, RowResult> reader = input.createRecordReader(null, null);
+ reader.initialize(Iterables.getOnlyElement(splits), null);
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
new file mode 100644
index 0000000..86452ed
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ITKuduTableOutputFormat extends BaseKuduTest {
+
+ private static final String TABLE_NAME =
+ ITKuduTableOutputFormat.class.getName() + "-" + System.currentTimeMillis();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ BaseKuduTest.setUpBeforeClass();
+ }
+
+ @Test
+ public void test() throws Exception {
+ createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+
+ KuduTableOutputFormat output = new KuduTableOutputFormat();
+ Configuration conf = new Configuration();
+ conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
+ conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, TABLE_NAME);
+ output.setConf(conf);
+
+ String multitonKey = conf.get(KuduTableOutputFormat.MULTITON_KEY);
+ KuduTable table = KuduTableOutputFormat.getKuduTable(multitonKey);
+ assertNotNull(table);
+
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt(0, 1);
+ row.addInt(1, 2);
+ row.addInt(2, 3);
+ row.addString(3, "a string");
+ row.addBoolean(4, true);
+
+ RecordWriter<NullWritable, Operation> rw = output.getRecordWriter(null);
+ rw.write(NullWritable.get(), insert);
+ rw.close(null);
+ AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table);
+ assertEquals(1, countRowsInScan(builder.build()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
new file mode 100644
index 0000000..dff2400
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.kududb.client.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class ITOutputFormatJob extends BaseKuduTest {
+
+ private static final String TABLE_NAME =
+ ITOutputFormatJob.class.getName() + "-" + System.currentTimeMillis();
+
+ private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ BaseKuduTest.setUpBeforeClass();
+ createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ BaseKuduTest.tearDownAfterClass();
+ } finally {
+ HADOOP_UTIL.cleanup();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ String testHome =
+ HADOOP_UTIL.setupAndGetTestDir(ITOutputFormatJob.class.getName(), conf).getAbsolutePath();
+ String jobName = ITOutputFormatJob.class.getName();
+ Job job = new Job(conf, jobName);
+
+
+ // Create a 2 lines input file
+ File data = new File(testHome, "data.txt");
+ writeDataFile(data);
+ FileInputFormat.setInputPaths(job, data.toString());
+
+ // Configure the job to map the file and write to kudu, without reducers
+ Class<TestMapperTableOutput> mapperClass = TestMapperTableOutput.class;
+ job.setJarByClass(mapperClass);
+ job.setMapperClass(mapperClass);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setNumReduceTasks(0);
+ new KuduTableMapReduceUtil.TableOutputFormatConfigurator(
+ job,
+ TABLE_NAME,
+ getMasterAddresses())
+ .operationTimeoutMs(DEFAULT_SLEEP)
+ .addDependencies(false)
+ .configure();
+
+ assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+ // Make sure the data's there
+ KuduTable table = openTable(TABLE_NAME);
+ AsyncKuduScanner.AsyncKuduScannerBuilder builder =
+ client.newScannerBuilder(table);
+ assertEquals(2, countRowsInScan(builder.build()));
+ }
+
+ /**
+ * Simple Mapper that writes one row per line, the key is the line number and the STRING column
+ * is the data from that line
+ */
+ static class TestMapperTableOutput extends
+ Mapper<LongWritable, Text, NullWritable, Operation> {
+
+ private KuduTable table;
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException,
+ InterruptedException {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt(0, (int) key.get());
+ row.addInt(1, 1);
+ row.addInt(2, 2);
+ row.addString(3, value.toString());
+ row.addBoolean(4, true);
+ context.write(NullWritable.get(), insert);
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ table = KuduTableMapReduceUtil.getTableFromContext(context);
+ }
+ }
+
+ private void writeDataFile(File data) throws IOException {
+ FileOutputStream fos = new FileOutputStream(data);
+ fos.write("VALUE1\nVALUE2\n".getBytes());
+ fos.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java
new file mode 100644
index 0000000..3801a0c
--- /dev/null
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/TestJarFinder.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.mapreduce;
+
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.text.MessageFormat;
+import java.util.Properties;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+/**
+ * This file was forked from hbase/branches/master@4ce6f48.
+ */
+public class TestJarFinder {
+
+ @Test
+ public void testJar() throws Exception {
+
+ // Picking a class that is for sure in a JAR in the classpath
+ String jar = JarFinder.getJar(LogFactory.class);
+ Assert.assertTrue(new File(jar).exists());
+ }
+
+ private static void delete(File file) throws IOException {
+ if (file.getAbsolutePath().length() < 5) {
+ throw new IllegalArgumentException(
+ MessageFormat.format("Path [{0}] is too short, not deleting",
+ file.getAbsolutePath()));
+ }
+ if (file.exists()) {
+ if (file.isDirectory()) {
+ File[] children = file.listFiles();
+ if (children != null) {
+ for (File child : children) {
+ delete(child);
+ }
+ }
+ }
+ if (!file.delete()) {
+ throw new RuntimeException(
+ MessageFormat.format("Could not delete path [{0}]",
+ file.getAbsolutePath()));
+ }
+ }
+ }
+
+ @Test
+ public void testExpandedClasspath() throws Exception {
+ // Picking a class that is for sure in a directory in the classpath
+ // In this case, the JAR is created on the fly
+ String jar = JarFinder.getJar(TestJarFinder.class);
+ Assert.assertTrue(new File(jar).exists());
+ }
+
+ @Test
+ public void testExistingManifest() throws Exception {
+ File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+ TestJarFinder.class.getName() + "-testExistingManifest");
+ delete(dir);
+ dir.mkdirs();
+
+ File metaInfDir = new File(dir, "META-INF");
+ metaInfDir.mkdirs();
+ File manifestFile = new File(metaInfDir, "MANIFEST.MF");
+ Manifest manifest = new Manifest();
+ OutputStream os = new FileOutputStream(manifestFile);
+ manifest.write(os);
+ os.close();
+
+ File propsFile = new File(dir, "props.properties");
+ Writer writer = new FileWriter(propsFile);
+ new Properties().store(writer, "");
+ writer.close();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JarOutputStream zos = new JarOutputStream(baos);
+ JarFinder.jarDir(dir, "", zos);
+ JarInputStream jis =
+ new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Assert.assertNotNull(jis.getManifest());
+ jis.close();
+ }
+
+ @Test
+ public void testNoManifest() throws Exception {
+ File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+ TestJarFinder.class.getName() + "-testNoManifest");
+ delete(dir);
+ dir.mkdirs();
+ File propsFile = new File(dir, "props.properties");
+ Writer writer = new FileWriter(propsFile);
+ new Properties().store(writer, "");
+ writer.close();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JarOutputStream zos = new JarOutputStream(baos);
+ JarFinder.jarDir(dir, "", zos);
+ JarInputStream jis =
+ new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Assert.assertNotNull(jis.getManifest());
+ jis.close();
+ }
+}
\ No newline at end of file