You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2019/02/12 14:52:12 UTC
[phoenix] branch master updated: PHOENIX-5128 Add ability to skip
header in CsvBulkLoadTool
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 20bc741 PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool
20bc741 is described below
commit 20bc74145762d2b19e80b609bec901489accd5cb
Author: Josh Elser <el...@apache.org>
AuthorDate: Fri Feb 8 11:13:01 2019 -0500
PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool
---
.../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 33 ++++++++
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 9 +-
.../phoenix/mapreduce/PhoenixTextInputFormat.java | 97 ++++++++++++++++++++++
3 files changed, 137 insertions(+), 2 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 7e4226d..699b469 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -497,4 +497,37 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
stmt.close();
}
+
+ @Test
+ public void testIgnoreCsvHeader() throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE S.TABLE13 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
+
+ final Configuration conf = new Configuration(getUtility().getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input13.csv"));
+ try (PrintWriter printWriter = new PrintWriter(outputStream)) {
+ printWriter.println("id,name");
+ printWriter.println("1,Name 1");
+ printWriter.println("2,Name 2");
+ printWriter.println("3,Name 3");
+ }
+
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(conf);
+ int exitCode = csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input13.csv",
+ "--table", "table13",
+ "--schema", "s",
+ "--zookeeper", zkQuorum,
+ "--skip-header"});
+ assertEquals(0, exitCode);
+
+ try (ResultSet rs = stmt.executeQuery("SELECT COUNT(1) FROM S.TABLE13")) {
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+ }
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 13c7ab6..e321361 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -88,6 +87,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
+ static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)");
/**
* Set configuration values based on parsed command line options.
@@ -111,6 +111,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
options.addOption(IMPORT_COLUMNS_OPT);
options.addOption(IGNORE_ERRORS_OPT);
options.addOption(HELP_OPT);
+ options.addOption(SKIP_HEADER_OPT);
return options;
}
@@ -202,6 +203,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
conf.set(entry.getKey(), entry.getValue());
}
}
+ // Skip the first line of the CSV file(s)?
+ if (cmdLine.hasOption(SKIP_HEADER_OPT.getOpt())) {
+ PhoenixTextInputFormat.setSkipHeader(conf);
+ }
final Connection conn = QueryUtil.getConnection(conf);
if (LOG.isDebugEnabled()) {
@@ -279,7 +284,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
FileInputFormat.addInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, outputPath);
- job.setInputFormatClass(TextInputFormat.class);
+ job.setInputFormatClass(PhoenixTextInputFormat.class);
job.setMapOutputKeyClass(TableRowkeyPair.class);
job.setMapOutputValueClass(ImmutableBytesWritable.class);
job.setOutputKeyClass(TableRowkeyPair.class);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java
new file mode 100644
index 0000000..cc170f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around TextInputFormat which can ignore the first line in the first InputSplit
+ * for a file.
+ */
+public class PhoenixTextInputFormat extends TextInputFormat {
+ public static final String SKIP_HEADER_KEY = "phoenix.input.format.skip.header";
+
+ public static void setSkipHeader(Configuration conf) {
+ conf.setBoolean(SKIP_HEADER_KEY, true);
+ }
+
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+ RecordReader<LongWritable,Text> rr = super.createRecordReader(split, context);
+
+ return new PhoenixLineRecordReader((LineRecordReader) rr);
+ }
+
+ public static class PhoenixLineRecordReader extends RecordReader<LongWritable,Text> {
+ private static final Logger LOG = LoggerFactory.getLogger(PhoenixLineRecordReader.class);
+ private final LineRecordReader rr;
+ private PhoenixLineRecordReader(LineRecordReader rr) {
+ this.rr = rr;
+ }
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ rr.initialize(genericSplit, context);
+ final Configuration conf = context.getConfiguration();
+ final FileSplit split = (FileSplit) genericSplit;
+ if (conf.getBoolean(SKIP_HEADER_KEY, false) && split.getStart() == 0) {
+ LOG.trace("Consuming first key-value from {}", genericSplit);
+ nextKeyValue();
+ } else {
+ LOG.trace("Not configured to skip header or not the first input split: {}", split);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return rr.nextKeyValue();
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException {
+ return rr.getCurrentKey();
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException {
+ return rr.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return rr.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rr.close();
+ }
+ }
+}