You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/08/09 22:02:29 UTC
kudu git commit: kudu client tools for hadoop and spark
import/export(csv, parquet, avro)
Repository: kudu
Updated Branches:
refs/heads/master 4b045c1d3 -> 5d53a3b71
kudu client tools for hadoop and spark import/export(csv,parquet,avro)
Change-Id: If462af948651f3869b444e82151c3559fde19142
Reviewed-on: http://gerrit.cloudera.org:8080/7421
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5d53a3b7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5d53a3b7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5d53a3b7
Branch: refs/heads/master
Commit: 5d53a3b7146c6bcf330ddae036cfc42eeb7b7849
Parents: 4b045c1
Author: sany <sa...@gmail.com>
Authored: Fri Jul 14 16:54:57 2017 +0530
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Aug 9 22:02:13 2017 +0000
----------------------------------------------------------------------
java/kudu-client-tools/pom.xml | 5 +
.../apache/kudu/mapreduce/tools/ExportCsv.java | 109 +++++++++++
.../kudu/mapreduce/tools/ExportCsvMapper.java | 114 ++++++++++++
.../kudu/mapreduce/tools/ImportParquet.java | 180 +++++++++++++++++++
.../mapreduce/tools/ImportParquetMapper.java | 113 ++++++++++++
.../mapreduce/tools/ParquetReadSupport.java | 36 ++++
.../kudu/mapreduce/tools/ITExportCsv.java | 88 +++++++++
.../kudu/mapreduce/tools/ITImportParquet.java | 147 +++++++++++++++
.../tools/ITImportParquetPreCheck.java | 151 ++++++++++++++++
java/kudu-spark-tools/pom.xml | 8 +-
.../kudu/spark/tools/ImportExportFiles.scala | 159 ++++++++++++++++
.../spark/tools/TestImportExportFiles.scala | 82 +++++++++
java/pom.xml | 15 +-
13 files changed, 1200 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/pom.xml b/java/kudu-client-tools/pom.xml
index d4908fa..65ac4e3 100644
--- a/java/kudu-client-tools/pom.xml
+++ b/java/kudu-client-tools/pom.xml
@@ -86,6 +86,11 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
new file mode 100644
index 0000000..3460a50
--- /dev/null
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.KuduTableInputFormat;
+import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
+
+/**
+ * Map-only job that reads Kudu rows and writes them into a CSV file.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ExportCsv extends Configured implements Tool {
+
+ static final String NAME = "exportcsv";
+ static final String DEFAULT_SEPARATOR = "\t";
+ static final String SEPARATOR_CONF_KEY = "exportcsv.separator";
+ static final String JOB_NAME_CONF_KEY = "exportcsv.job.name";
+ static final String COLUMNS_NAMES_KEY = "exportcsv.column.names";
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf the current configuration
+ * @param args the command line parameters
+ * @return the newly created job
+ * @throws java.io.IOException when setting up the job fails
+ */
+ @SuppressWarnings("deprecation")
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException, ClassNotFoundException {
+
+ Class<ExportCsvMapper> mapperClass = ExportCsvMapper.class;
+ conf.set(COLUMNS_NAMES_KEY, args[0]);
+ String tableName = args[1];
+ final Path outputDir = new Path(args[2]);
+
+ String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName);
+ Job job = new Job(conf, jobName);
+ job.setJarByClass(mapperClass);
+ job.setInputFormatClass(KuduTableInputFormat.class);
+ new KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(job, tableName,
+ args[0]).configure();
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setMapperClass(mapperClass);
+ job.setNumReduceTasks(0);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ return job;
+ }
+
+ /*
+ * @param errorMsg error message. can be null
+ */
+ private static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ String usage = "Usage: " + NAME + " <colAa,colB,colC> <table.name> <output.dir>\n\n" +
+ "Exports the given table and columns into the specified output path.\n" +
+ "The column names of the Kudu table must be specified in the form of \n" +
+ "comma-separated column names.\n" +
+ "Other options that may be specified with -D include:\n" +
+ "'-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
+ "-D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the" +
+ "export.\n" + CommandLineParser.getHelpSnippet();
+ System.err.println(usage);
+ }
+
+ @Override
+ public int run(String[] otherArgs) throws Exception {
+ if (otherArgs.length < 3) {
+ usage("Wrong number of arguments: " + otherArgs.length);
+ return -1;
+ }
+ Job job = createSubmittableJob(getConf(), otherArgs);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int status = ToolRunner.run(new ExportCsv(), args);
+ System.exit(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
new file mode 100644
index 0000000..bbe855c
--- /dev/null
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Bytes;
+import org.apache.kudu.client.RowResult;
+
+/**
+ * Mapper that ingests Kudu rows and turns them into CSV lines.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExportCsvMapper extends Mapper<NullWritable, RowResult, NullWritable,Text> {
+
+ private static final NullWritable NULL_KEY = NullWritable.get();
+
+ /** Column seperator */
+ private String separator;
+
+ private Schema schema;
+
+ /**
+ * Handles initializing this class with objects specific to it.
+ */
+ @Override
+ protected void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ this.separator = conf.get(ExportCsv.SEPARATOR_CONF_KEY, ExportCsv.DEFAULT_SEPARATOR);
+ }
+
+ /**
+ * Converts Kudu RowResult into a line of CSV text.
+ */
+ @Override
+ public void map(NullWritable key, RowResult value, Context context) throws IOException {
+ this.schema = value.getSchema();
+ try {
+ context.write(NULL_KEY, new Text(rowResultToString(value)));
+ } catch (InterruptedException e) {
+ throw new IOException("Failing task since it was interrupted", e);
+ }
+ }
+
+ private String rowResultToString(RowResult value) {
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < schema.getColumnCount(); i++) {
+ ColumnSchema col = schema.getColumnByIndex(i);
+ if (i != 0) {
+ buf.append(this.separator);
+ }
+
+ switch (col.getType()) {
+ case INT8:
+ buf.append(value.getByte(i));
+ break;
+ case INT16:
+ buf.append(value.getShort(i));
+ break;
+ case INT32:
+ buf.append(value.getInt(i));
+ break;
+ case INT64:
+ buf.append(value.getLong(i));
+ break;
+ case STRING:
+ buf.append(value.getString(i));
+ break;
+ case BINARY:
+ buf.append(Bytes.pretty(value.getBinaryCopy(i)));
+ break;
+ case FLOAT:
+ buf.append(value.getFloat(i));
+ break;
+ case DOUBLE:
+ buf.append(value.getDouble(i));
+ break;
+ case BOOL:
+ buf.append(value.getBoolean(i));
+ break;
+ case UNIXTIME_MICROS:
+ buf.append(value.getLong(i));
+ break;
+ default:
+ buf.append("<unknown type!>");
+ break;
+ }
+ }
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
new file mode 100644
index 0000000..eff500d
--- /dev/null
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import static java.sql.Types.TIMESTAMP;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
+
+/**
+ * Map-only job that reads Apache Parquet files and inserts them into a single Kudu table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ImportParquet extends Configured implements Tool {
+
+ static final String NAME = "importparquet";
+ static final String JOB_NAME_CONF_KEY = "importparquet.job.name";
+ static final String PARQUET_INPUT_SCHEMA = "importparquet.input.schema";
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf the current configuration
+ * @param args the command line parameters
+ * @return the newly created job
+ * @throws java.io.IOException when setting up the job fails
+ */
+ @SuppressWarnings("deprecation")
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException, ClassNotFoundException {
+
+ final String tableName = args[0];
+ Path inputDir = new Path(args[1]);
+
+ List<Footer> footers = new ArrayList<Footer>();
+ footers.addAll(ParquetFileReader.readFooters(conf, inputDir));
+
+ MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+ GroupWriteSupport.setSchema(schema, conf);
+ conf.set(PARQUET_INPUT_SCHEMA, schema.toString());
+
+ String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName);
+ Job job = new Job(conf,jobName);
+ job.setJarByClass(ImportParquet.class);
+ job.setMapperClass(ImportParquetMapper.class);
+ job.setNumReduceTasks(0);
+ job.setInputFormatClass(ParquetInputFormat.class);
+ ParquetInputFormat.setReadSupportClass(job, ParquetReadSupport.class);
+ ParquetInputFormat.setInputPaths(job, inputDir);
+
+ CommandLineParser cmdLineParser = new CommandLineParser(conf);
+ KuduClient client = cmdLineParser.getClient();
+ KuduTable table = client.openTable(tableName);
+
+
+ // Pre-flight checks of input parquet schema and table schema.
+ for (ColumnSchema columnSchema : table.getSchema().getColumns()) {
+ if (schema.containsField(columnSchema.getName())) {
+ if (!schema.getType(columnSchema.getName()).asPrimitiveType().getPrimitiveTypeName()
+ .equals(getTypeName(columnSchema.getType()))) {
+ throw new IllegalArgumentException("The column type " +
+ getTypeName(columnSchema.getType()) + " does not exist in Parquet schema");
+ }
+ } else {
+ throw new IllegalArgumentException("The column " + columnSchema.getName() +
+ " does not exist in Parquet schema");
+ }
+ }
+ // Kudu doesn't support Parquet's TIMESTAMP.
+ Iterator<ColumnDescriptor> fields = schema.getColumns().iterator();
+ while (fields.hasNext()) {
+ if (fields.next().getType().equals(TIMESTAMP)) {
+ throw new IllegalArgumentException("This " + fields.next().getType() +
+ " Parquet type is not supported in Kudu");
+ }
+ }
+
+ FileInputFormat.setInputPaths(job, inputDir);
+ new KuduTableMapReduceUtil.TableOutputFormatConfiguratorWithCommandLineParser(
+ job,
+ tableName)
+ .configure();
+ return job;
+ }
+
+ private static PrimitiveType.PrimitiveTypeName getTypeName(Type type) {
+ switch (type) {
+ case BOOL:
+ return PrimitiveType.PrimitiveTypeName.BOOLEAN;
+ case INT8:
+ return PrimitiveType.PrimitiveTypeName.INT32;
+ case INT16:
+ return PrimitiveType.PrimitiveTypeName.INT64;
+ case INT32:
+ return PrimitiveType.PrimitiveTypeName.INT32;
+ case INT64:
+ return PrimitiveType.PrimitiveTypeName.INT64;
+ case STRING:
+ return PrimitiveType.PrimitiveTypeName.BINARY;
+ case FLOAT:
+ return PrimitiveType.PrimitiveTypeName.FLOAT;
+ case DOUBLE:
+ return PrimitiveType.PrimitiveTypeName.DOUBLE;
+ default:
+ throw new IllegalArgumentException("Type " + type.getName() + " not recognized");
+ }
+ }
+
+ /*
+ * @param errorMsg error message. can be null
+ */
+ private static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ String usage =
+ "Usage: " + NAME + "<table.name> <input.dir>\n\n" +
+ "Imports the given input directory of Apache Parquet data into the specified table.\n" +
+ "Other options that may be specified with -D include:\n" +
+ "-D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the" +
+ "import.\n" + CommandLineParser.getHelpSnippet();
+
+ System.err.println(usage);
+ }
+
+ @Override
+ public int run(String[] otherArgs) throws Exception {
+ if (otherArgs.length < 1) {
+ usage("Wrong number of arguments: " + otherArgs.length);
+ return -1;
+ }
+ Job job = createSubmittableJob(getConf(), otherArgs);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int status = ToolRunner.run(new ImportParquet(), args);
+ System.exit(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
new file mode 100644
index 0000000..bf40442
--- /dev/null
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
+
+/**
+ * Mapper that ingests Apache Parquet lines and turns them into Kudu Inserts.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ImportParquetMapper extends Mapper<LongWritable, Group, NullWritable, Operation> {
+
+ private static final NullWritable NULL_KEY = NullWritable.get();
+
+ private MessageType parquetSchema;
+
+ private KuduTable table;
+ private Schema schema;
+
+ /**
+ * Handles initializing this class with objects specific to it (i.e., the parser).
+ */
+ @Override
+ protected void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ parquetSchema = MessageTypeParser.parseMessageType(conf.get(
+ ImportParquet.PARQUET_INPUT_SCHEMA));
+
+ this.table = KuduTableMapReduceUtil.getTableFromContext(context);
+ this.schema = this.table.getSchema();
+ }
+
+ /**
+ * Convert a line of Parquet data into a Kudu Insert
+ */
+ @Override
+ public void map(LongWritable key, Group value, Context context)
+ throws IOException {
+
+ try {
+ Insert insert = this.table.newInsert();
+ PartialRow row = insert.getRow();
+ for (int i = 0; i < parquetSchema.getFields().size(); i++) {
+ String colName = parquetSchema.getFields().get(i).getName();
+ ColumnSchema col = this.schema.getColumn(colName);
+ String colValue = value.getValueToString(i, 0);
+ switch (col.getType()) {
+ case BOOL:
+ row.addBoolean(colName, Boolean.parseBoolean(colValue));
+ break;
+ case INT8:
+ row.addByte(colName, Byte.parseByte(colValue));
+ break;
+ case INT16:
+ row.addShort(colName, Short.parseShort(colValue));
+ break;
+ case INT32:
+ row.addInt(colName, Integer.parseInt(colValue));
+ break;
+ case INT64:
+ row.addLong(colName, Long.parseLong(colValue));
+ break;
+ case STRING:
+ row.addString(colName, colValue);
+ break;
+ case FLOAT:
+ row.addFloat(colName, Float.parseFloat(colValue));
+ break;
+ case DOUBLE:
+ row.addDouble(colName, Double.parseDouble(colValue));
+ break;
+ default:
+ throw new IllegalArgumentException("Type " + col.getType() + " not recognized");
+ }
+ }
+ context.write(NULL_KEY, insert);
+ } catch (InterruptedException e) {
+ throw new IOException("Failing task since it was interrupted", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
new file mode 100644
index 0000000..6762f99
--- /dev/null
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.api.DelegatingReadSupport;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+
+/**
+ * Read support for Apache Parquet.
+ */
+public final class ParquetReadSupport extends DelegatingReadSupport<Group> {
+
+ public ParquetReadSupport() {
+ super(new GroupReadSupport());
+ }
+
+ @Override
+ public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
+ return super.init(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
new file mode 100644
index 0000000..984ec63
--- /dev/null
+++ b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.mapreduce.tools;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.HadoopTestingUtility;
+
+public class ITExportCsv extends BaseKuduTest {
+
+ private static final String TABLE_NAME =
+ ITExportCsv.class.getName() + "-" + System.currentTimeMillis();
+
+ private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+ private static Schema schema;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ BaseKuduTest.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ BaseKuduTest.tearDownAfterClass();
+ } finally {
+ HADOOP_UTIL.cleanup();
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ String testHome =
+ HADOOP_UTIL.setupAndGetTestDir(ITExportCsv.class.getName(), conf).getAbsolutePath();
+
+ // create a table with on empty tablet and 3 tablets of 3 rows each.
+ createFourTabletsTableWithNineRows(TABLE_NAME);
+ String[] args = new String[] {
+ "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" + getMasterAddresses(),
+ "*", TABLE_NAME, testHome + "/exportdata"};
+
+ GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+ Job job = ExportCsv.createSubmittableJob(parser.getConfiguration(), parser.getRemainingArgs());
+ assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+ String csvContent = readCsvFile(new File(testHome + "/exportdata/part-m-00001"));
+ assertEquals(csvContent.split("\n").length,3);
+ assertEquals(csvContent.split("\n")[0].split("\t")[3],"a string");
+ }
+
+ private String readCsvFile(File data) throws IOException {
+ FileInputStream fos = new FileInputStream(data);
+ return IOUtils.toString(fos, "UTF-8");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
new file mode 100644
index 0000000..0761a75
--- /dev/null
+++ b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.mapreduce.tools;
+
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kudu.client.KuduTable;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.HadoopTestingUtility;
+
+public class ITImportParquet extends BaseKuduTest {
+
+ private static final String TABLE_NAME =
+ ITImportParquet.class.getName() + "-" + System.currentTimeMillis();
+
+ private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+ private static Schema schema;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ BaseKuduTest.setUpBeforeClass();
+
+ ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+ .key(true)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_d", Type.DOUBLE)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING)
+ .nullable(true)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL)
+ .build());
+ schema = new Schema(columns);
+
+ createTable(TABLE_NAME, schema,
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ BaseKuduTest.tearDownAfterClass();
+ } finally {
+ HADOOP_UTIL.cleanup();
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ String testHome =
+ HADOOP_UTIL.setupAndGetTestDir(ITImportCsv.class.getName(), conf).getAbsolutePath();
+
+ // Create a 4 records parquet input file.
+ Path data = new Path(testHome, "data.parquet");
+ writeParquetFile(data,conf);
+
+ StringBuilder sb = new StringBuilder();
+ for (ColumnSchema col : schema.getColumns()) {
+ sb.append(col.getName());
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ String[] args = new String[] { "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" + getMasterAddresses(),
+ TABLE_NAME, data.toString()};
+
+ GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+ Job job = ImportParquet.createSubmittableJob(parser.getConfiguration(), parser.getRemainingArgs());
+ assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+ KuduTable openTable = openTable(TABLE_NAME);
+ assertEquals(4, countRowsInScan(
+ client.newScannerBuilder(openTable).build()));
+ assertEquals("INT32 key=1, INT32 column1_i=3, DOUBLE column2_d=2.3, STRING column3_s=some string, " +
+ "BOOL column4_b=true",scanTableToStrings(openTable).get(0));
+ }
+
+ private void writeParquetFile(Path data,Configuration conf) throws IOException {
+ MessageType schema = parseMessageType(
+ "message test { "
+ + "required int32 key; "
+ + "required int32 column1_i; "
+ + "required double column2_d; "
+ + "required binary column3_s; "
+ + "required boolean column4_b; "
+ + "} ");
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(data, new GroupWriteSupport(),
+ UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_1_0, conf);
+
+ writer.write(f.newGroup().append("key", 1).append("column1_i", 3).append("column2_d", 2.3)
+ .append("column3_s", "some string").append("column4_b", true));
+ writer.write(f.newGroup().append("key", 2).append("column1_i", 5).append("column2_d", 4.5)
+ .append("column3_s", "some more").append("column4_b", false));
+ writer.write(f.newGroup().append("key", 3).append("column1_i", 7).append("column2_d", 5.6)
+ .append("column3_s", "some more and more").append("column4_b", true));
+ writer.write(f.newGroup().append("key", 4).append("column1_i", 9).append("column2_d",10.9)
+ .append("column3_s", "some more and alst").append("column4_b", false));
+ writer.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
new file mode 100644
index 0000000..ab332ed
--- /dev/null
+++ b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.mapreduce.tools;
+
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kudu.client.KuduTable;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.HadoopTestingUtility;
+
+public class ITImportParquetPreCheck extends BaseKuduTest {
+
+ private static final String TABLE_NAME =
+ ITImportParquet.class.getName() + "-" + System.currentTimeMillis();
+
+ private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
+
+ private static Schema schema;
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ BaseKuduTest.setUpBeforeClass();
+
+ ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+ .key(true)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_d", Type.DOUBLE)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING)
+ .nullable(true)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL)
+ .build());
+ schema = new Schema(columns);
+
+ createTable(TABLE_NAME, schema,
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ BaseKuduTest.tearDownAfterClass();
+ } finally {
+ HADOOP_UTIL.cleanup();
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ Configuration conf = new Configuration();
+ String testHome =
+ HADOOP_UTIL.setupAndGetTestDir(ITImportCsv.class.getName(), conf).getAbsolutePath();
+
+ // Create a 4 records parquet input file.
+ Path data = new Path(testHome, "data.parquet");
+ writeParquetFile(data,conf);
+
+ StringBuilder sb = new StringBuilder();
+ for (ColumnSchema col : schema.getColumns()) {
+ sb.append(col.getName());
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ String[] args = new String[] { "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" +
+ getMasterAddresses(), TABLE_NAME, data.toString()};
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("The column column1_i does not exist in Parquet schema");
+
+ GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+ Job job = ImportParquet.createSubmittableJob(parser.getConfiguration(), parser.getRemainingArgs());
+ job.waitForCompletion(true);
+
+ KuduTable openTable = openTable(TABLE_NAME);
+ assertEquals(0, countRowsInScan(client.newScannerBuilder(openTable).build()));
+ }
+
+ private void writeParquetFile(Path data,Configuration conf) throws IOException {
+ MessageType schema = parseMessageType(
+ "message test { "
+ + "required int32 key; "
+ + "required int32 column1_i_s; "
+ + "required binary column2_d; "
+ + "required binary column3_s; "
+ + "required boolean column4_b; "
+ + "} ");
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(data, new GroupWriteSupport(),
+ UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_1_0, conf);
+
+ writer.write(f.newGroup().append("key", 1).append("column1_i_s", 292).append("column2_d", "no type")
+ .append("column3_s", "some string").append("column4_b", true));
+ writer.write(f.newGroup().append("key", 2).append("column1_i_s", 23).append("column2_d", "no type")
+ .append("column3_s", "some more").append("column4_b", false));
+ writer.write(f.newGroup().append("key", 3).append("column1_i_s", 32).append("column2_d", "no type")
+ .append("column3_s", "some more and more").append("column4_b", true));
+ writer.write(f.newGroup().append("key", 4).append("column1_i_s", 22).append("column2_d", "no type")
+ .append("column3_s", "some more and alst").append("column4_b", false));
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/pom.xml b/java/kudu-spark-tools/pom.xml
index c2eb57f..98ffe28 100644
--- a/java/kudu-spark-tools/pom.xml
+++ b/java/kudu-spark-tools/pom.xml
@@ -18,7 +18,8 @@
// specific language governing permissions and limitations
// under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kudu</groupId>
@@ -98,6 +99,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-avro_2.10</artifactId>
+ <version>${sparkavro.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
new file mode 100644
index 0000000..bc2f0a3
--- /dev/null
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.spark.tools
+
+import org.apache.kudu.client.KuduClient
+import org.apache.kudu.spark.tools.ImportExportKudu.ArgsCls
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.kudu.spark.kudu._
+import com.databricks.spark.avro
+import com.google.common.annotations.VisibleForTesting
+
+object ImportExportKudu {
+ val LOG: Logger = LoggerFactory.getLogger(ImportExportKudu.getClass)
+
+ def fail(msg: String): Nothing = {
+ System.err.println(msg)
+ sys.exit(1)
+ }
+
+ def usage: String =
+ s"""
+ | Usage: --operation=import/export --format=<data-format(csv,parquet,avro)> --master-addrs=<master-addrs> --path=<path> --table-name=<table-name>
+ | where
+ | operation: import or export data from or to Kudu tables, default: import
+ | format: specify the format of data want to import/export, the following formats are supported csv,parquet,avro default:csv
+ | masterAddrs: comma separated addresses of Kudu master nodes, default: localhost
+ | path: path to input or output for import/export operation, default: file://
+ | tableName: table name to import/export, default: ""
+ | columns: columns name for select statement on export from kudu table, default: *
+ | delimiter: delimiter for csv import/export, default: ,
+ | header: header for csv import/export, default:false
+ """.stripMargin
+
+ case class ArgsCls(operation: String = "import",
+ format: String = "csv",
+ masterAddrs: String = "localhost",
+ path: String = "file://",
+ tableName: String = "",
+ columns: String = "*",
+ delimiter: String = ",",
+ header: String = "false",
+ inferschema: String="false"
+ )
+
+ object ArgsCls {
+ private def parseInner(options: ArgsCls, args: List[String]): ArgsCls = {
+ LOG.info(args.mkString(","))
+ args match {
+ case Nil => options
+ case "--help" :: _ =>
+ System.err.println(usage)
+ sys.exit(0)
+ case flag :: Nil => fail(s"flag $flag has no value\n$usage")
+ case flag :: value :: tail =>
+ val newOptions: ArgsCls = flag match {
+ case "--operation" => options.copy(operation = value)
+ case "--format" => options.copy(format = value)
+ case "--master-addrs" => options.copy(masterAddrs = value)
+ case "--path" => options.copy(path = value)
+ case "--table-name" => options.copy(tableName = value)
+ case "--columns" => options.copy(columns = value)
+ case "--delimiter" => options.copy(delimiter = value)
+ case "--header" => options.copy(header = value)
+ case "--inferschema" => options.copy(inferschema = value)
+ case _ => fail(s"unknown argument given $flag")
+ }
+ parseInner(newOptions, tail)
+ }
+ }
+
+ def parse(args: Array[String]): ArgsCls = {
+ parseInner(ArgsCls(), args.flatMap(_.split('=')).toList)
+ }
+ }
+}
+
+object ImportExportFiles {
+
+ import ImportExportKudu.{LOG, fail}
+
+ var sqlContext: SQLContext = _
+ var kuduOptions: Map[String, String] = _
+
+ def run(args: ArgsCls, sc: SparkContext, sqlContext: SQLContext): Unit = {
+ val kc = new KuduContext(args.masterAddrs, sc)
+ val applicationId = sc.applicationId
+
+ val client: KuduClient = kc.syncClient
+ if (!client.tableExists(args.tableName)) {
+ fail(args.tableName + s" table doesn't exist")
+ }
+
+ kuduOptions = Map(
+ "kudu.table" -> args.tableName,
+ "kudu.master" -> args.masterAddrs)
+
+ args.operation match {
+ case "import" =>
+ args.format match {
+ case "csv" =>
+ val df = sqlContext.read.option("header", args.header).option("delimiter", args.delimiter).csv(args.path)
+ kc.upsertRows(df, args.tableName)
+ case "parquet" =>
+ val df = sqlContext.read.parquet(args.path)
+ kc.upsertRows(df, args.tableName)
+ case "avro" =>
+ val df = sqlContext.read.format("com.databricks.spark.avro").load(args.path)
+ kc.upsertRows(df, args.tableName)
+ case _ => fail(args.format + s"unknown argument given ")
+ }
+ case "export" =>
+ val df = sqlContext.read.options(kuduOptions).kudu.select(args.columns);
+ args.format match {
+ case "csv" =>
+ df.write.format("com.databricks.spark.csv").option("header", args.header).option("delimiter",
+ args.delimiter).save(args.path)
+ case "parquet" =>
+ df.write.parquet(args.path)
+ case "avro" =>
+ df.write.format("com.databricks.spark.avro").save(args.path)
+ case _ => fail(args.format + s"unknown argument given ")
+ }
+ case _ => fail(args.operation + s"unknown argument given ")
+ }
+ }
+ /**
+ * Entry point for testing. SparkContext is a singleton,
+ * so tests must create and manage their own.
+ */
+ @VisibleForTesting
+ def testMain(args: Array[String], sc: SparkContext): Unit = {
+ sqlContext = new SQLContext(sc)
+ run(ArgsCls.parse(args), sc, sqlContext)
+ }
+
+ def main(args: Array[String]): Unit = {
+ val conf = new SparkConf().setAppName("Import or Export CSV files from/to Kudu ")
+ val sc = new SparkContext(conf)
+ testMain(args, sc)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
new file mode 100644
index 0000000..2507853
--- /dev/null
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.tools
+
+import java.io.{File, FileOutputStream}
+
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.spark.kudu._
+import org.apache.spark.sql.SQLContext
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FunSuite, Matchers}
+import org.spark_project.guava.collect.ImmutableList
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class TestImportExportFiles extends FunSuite with TestContext with Matchers {
+
+ private val TABLE_NAME: String = classOf[TestImportExportFiles].getName + "-" + System.currentTimeMillis
+ var sqlContext : SQLContext = _
+ var kuduOptions : Map[String, String] = _
+
+ test("Spark Import Export") {
+ val schema: Schema = {
+ val columns = ImmutableList.of(
+ new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
+ new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
+ new ColumnSchemaBuilder("column2_d", Type.STRING).nullable(true).build(),
+ new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
+ new ColumnSchemaBuilder("column4_b", Type.STRING).build())
+ new Schema(columns)
+ }
+ val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1)
+ kuduClient.createTable(TABLE_NAME, schema, tableOptions)
+
+ val data: File = new File("target/", TABLE_NAME+".csv")
+ writeCsvFile(data)
+
+ ImportExportFiles.testMain(Array("--operation=import",
+ "--format=csv",
+ s"--master-addrs=${miniCluster.getMasterAddresses}",
+ s"--path=${"target/"+TABLE_NAME+".csv"}",
+ s"--table-name=${TABLE_NAME}",
+ "--delimiter=,",
+ "--header=true",
+ "--inferschema=true"), sc)
+ val rdd = kuduContext.kuduRDD(sc, TABLE_NAME, List("key"))
+ assert(rdd.collect.length == 4)
+ assertEquals(rdd.collect().mkString(","),"[1],[2],[3],[4]")
+ }
+
+ def writeCsvFile(data: File)
+ {
+ val fos: FileOutputStream = new FileOutputStream(data)
+ fos.write("key,column1_i,column2_d,column3_s,column4_b\n".getBytes)
+ fos.write("1,3,2.3,some string,true\n".getBytes)
+ fos.write("2,5,4.5,some more,false\n".getBytes)
+ fos.write("3,7,1.2,wait this is not a double bad row,true\n".getBytes)
+ fos.write("4,9,10.1,trailing separator isn't bad mkay?,true\n".getBytes)
+ fos.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 5893735..fed039a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -29,10 +29,10 @@
<!-- inherit from the ASF POM for distribution management -->
<parent>
- <groupId>org.apache</groupId>
- <artifactId>apache</artifactId>
- <version>18</version>
- <relativePath/>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>18</version>
+ <relativePath/>
</parent>
<name>Kudu</name>
@@ -72,6 +72,8 @@
<protobuf.version>3.3.0</protobuf.version>
<slf4j.version>1.7.25</slf4j.version>
<yetus.version>0.4.0</yetus.version>
+ <parquet.version>1.9.0</parquet.version>
+ <sparkavro.version>3.2.0</sparkavro.version>
<!-- Scala Library dependencies -->
<spark1.version>1.6.3</spark1.version>
@@ -83,8 +85,9 @@
<!-- Misc variables -->
<testdata.dir>target/testdata</testdata.dir>
<testArgLine>-enableassertions -Xmx1900m
- -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
- -Djava.awt.headless=true</testArgLine>
+ -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
+ -Djava.awt.headless=true
+ </testArgLine>
</properties>
<modules>