You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/12/29 21:25:35 UTC
[1/3] phoenix git commit: PHOENIX-2538 - CsvBulkLoadTool should
return non-zero exit status if import fails
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 a7da7e387 -> 0f7dca652
refs/heads/4.x-HBase-1.0 1b3a4ca41 -> 55b7adadd
refs/heads/master a7fb22255 -> 943cc530e
PHOENIX-2538 - CsvBulkLoadTool should return non-zero exit status if import fails
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/943cc530
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/943cc530
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/943cc530
Branch: refs/heads/master
Commit: 943cc530e2f95ef251a2b2a8e4f35020ba9e639d
Parents: a7fb222
Author: ravimagham <ra...@bazaarvoice.com>
Authored: Tue Dec 29 10:14:57 2015 -0800
Committer: ravimagham <ra...@bazaarvoice.com>
Committed: Tue Dec 29 10:14:57 2015 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/CsvBulkLoadToolIT.java | 65 +++++++++++++---
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 80 ++++++++++----------
2 files changed, 94 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/943cc530/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 7daacb4..2970d56 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,6 +17,14 @@
*/
package org.apache.phoenix.mapreduce;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -28,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.QueryServices;
@@ -36,18 +45,9 @@ import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
@Category(NeedsOwnMiniClusterTest.class)
public class CsvBulkLoadToolIT {
@@ -321,4 +321,51 @@ public class CsvBulkLoadToolIT {
stmt.close();
}
+ @Test
+ public void testInvalidArguments() {
+ String tableName = "TABLE8";
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ try {
+ csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input4.csv",
+ "--table", tableName,
+ "--zookeeper", zkQuorum });
+ fail(String.format("Table %s not created, hence should fail",tableName));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof IllegalArgumentException);
+ assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+ }
+ }
+
+ @Test
+ public void testAlreadyExistsOutputPath() {
+ String tableName = "TABLE9";
+ String outputPath = "/tmp/output/tabl9";
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+
+ FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ fs.create(new Path(outputPath));
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input9.csv",
+ "--output", outputPath,
+ "--table", tableName,
+ "--zookeeper", zkQuorum });
+
+ fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof FileAlreadyExistsException);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/943cc530/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 1d2594d..4b5d618 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,10 +17,14 @@
*/
package org.apache.phoenix.mapreduce;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -57,14 +61,10 @@ import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
/**
* Base tool for running MapReduce-based ingests of data.
@@ -174,8 +174,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
return loadData(conf, cmdLine);
}
- private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
- InterruptedException, ExecutionException, ClassNotFoundException {
+ private int loadData(Configuration conf, CommandLine cmdLine) throws Exception {
String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
@@ -255,47 +254,44 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
/**
* Submits the jobs to the cluster.
* Loads the HFiles onto the respective tables.
+ * @throws Exception
*/
public int submitJob(final Configuration conf, final String qualifiedTableName,
- final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) {
- try {
- Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
- FileInputFormat.addInputPaths(job, inputPaths);
- FileOutputFormat.setOutputPath(job, outputPath);
+ final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception {
+
+ Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName);
+ FileInputFormat.addInputPaths(job, inputPaths);
+ FileOutputFormat.setOutputPath(job, outputPath);
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapOutputKeyClass(TableRowkeyPair.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.setOutputKeyClass(TableRowkeyPair.class);
- job.setOutputValueClass(KeyValue.class);
- job.setReducerClass(FormatToKeyValueReducer.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapOutputKeyClass(TableRowkeyPair.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setOutputKeyClass(TableRowkeyPair.class);
+ job.setOutputValueClass(KeyValue.class);
+ job.setReducerClass(FormatToKeyValueReducer.class);
- MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+ MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
- final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+ final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+ job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
- // give subclasses their hook
- setupJob(job);
+ // give subclasses their hook
+ setupJob(job);
- LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
- boolean success = job.waitForCompletion(true);
-
- if (success) {
- LOG.info("Loading HFiles from {}", outputPath);
- completebulkload(conf,outputPath,tablesToBeLoaded);
- }
+ LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
+ boolean success = job.waitForCompletion(true);
+ if (success) {
+ LOG.info("Loading HFiles from {}", outputPath);
+ completebulkload(conf,outputPath,tablesToBeLoaded);
LOG.info("Removing output directory {}", outputPath);
- if (!FileSystem.get(conf).delete(outputPath, true)) {
- LOG.error("Removing output directory {} failed", outputPath);
+ if(!FileSystem.get(conf).delete(outputPath, true)) {
+ LOG.error("Failed to delete the output directory {}", outputPath);
}
return 0;
- } catch(Exception e) {
- LOG.error("Error occurred submitting BulkLoad ", e);
+ } else {
return -1;
}
-
}
private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
[2/3] phoenix git commit: PHOENIX-2538 - CsvBulkLoadTool should
return non-zero exit status if import fails
Posted by ra...@apache.org.
PHOENIX-2538 - CsvBulkLoadTool should return non-zero exit status if import fails
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0f7dca65
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0f7dca65
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0f7dca65
Branch: refs/heads/4.x-HBase-0.98
Commit: 0f7dca652270bb404f1bcebf5b73a557dc2e5120
Parents: a7da7e3
Author: ravimagham <ra...@bazaarvoice.com>
Authored: Tue Dec 29 10:16:20 2015 -0800
Committer: ravimagham <ra...@bazaarvoice.com>
Committed: Tue Dec 29 10:16:20 2015 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/CsvBulkLoadToolIT.java | 65 +++++++++++++---
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 80 ++++++++++----------
2 files changed, 94 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f7dca65/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 7daacb4..2970d56 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,6 +17,14 @@
*/
package org.apache.phoenix.mapreduce;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -28,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.QueryServices;
@@ -36,18 +45,9 @@ import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
@Category(NeedsOwnMiniClusterTest.class)
public class CsvBulkLoadToolIT {
@@ -321,4 +321,51 @@ public class CsvBulkLoadToolIT {
stmt.close();
}
+ @Test
+ public void testInvalidArguments() {
+ String tableName = "TABLE8";
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ try {
+ csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input4.csv",
+ "--table", tableName,
+ "--zookeeper", zkQuorum });
+ fail(String.format("Table %s not created, hence should fail",tableName));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof IllegalArgumentException);
+ assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+ }
+ }
+
+ @Test
+ public void testAlreadyExistsOutputPath() {
+ String tableName = "TABLE9";
+ String outputPath = "/tmp/output/tabl9";
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+
+ FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ fs.create(new Path(outputPath));
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input9.csv",
+ "--output", outputPath,
+ "--table", tableName,
+ "--zookeeper", zkQuorum });
+
+ fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof FileAlreadyExistsException);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f7dca65/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 1d2594d..4b5d618 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,10 +17,14 @@
*/
package org.apache.phoenix.mapreduce;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -57,14 +61,10 @@ import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
/**
* Base tool for running MapReduce-based ingests of data.
@@ -174,8 +174,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
return loadData(conf, cmdLine);
}
- private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
- InterruptedException, ExecutionException, ClassNotFoundException {
+ private int loadData(Configuration conf, CommandLine cmdLine) throws Exception {
String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
@@ -255,47 +254,44 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
/**
* Submits the jobs to the cluster.
* Loads the HFiles onto the respective tables.
+ * @throws Exception
*/
public int submitJob(final Configuration conf, final String qualifiedTableName,
- final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) {
- try {
- Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
- FileInputFormat.addInputPaths(job, inputPaths);
- FileOutputFormat.setOutputPath(job, outputPath);
+ final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception {
+
+ Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName);
+ FileInputFormat.addInputPaths(job, inputPaths);
+ FileOutputFormat.setOutputPath(job, outputPath);
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapOutputKeyClass(TableRowkeyPair.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.setOutputKeyClass(TableRowkeyPair.class);
- job.setOutputValueClass(KeyValue.class);
- job.setReducerClass(FormatToKeyValueReducer.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapOutputKeyClass(TableRowkeyPair.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setOutputKeyClass(TableRowkeyPair.class);
+ job.setOutputValueClass(KeyValue.class);
+ job.setReducerClass(FormatToKeyValueReducer.class);
- MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+ MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
- final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+ final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+ job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
- // give subclasses their hook
- setupJob(job);
+ // give subclasses their hook
+ setupJob(job);
- LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
- boolean success = job.waitForCompletion(true);
-
- if (success) {
- LOG.info("Loading HFiles from {}", outputPath);
- completebulkload(conf,outputPath,tablesToBeLoaded);
- }
+ LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
+ boolean success = job.waitForCompletion(true);
+ if (success) {
+ LOG.info("Loading HFiles from {}", outputPath);
+ completebulkload(conf,outputPath,tablesToBeLoaded);
LOG.info("Removing output directory {}", outputPath);
- if (!FileSystem.get(conf).delete(outputPath, true)) {
- LOG.error("Removing output directory {} failed", outputPath);
+ if(!FileSystem.get(conf).delete(outputPath, true)) {
+ LOG.error("Failed to delete the output directory {}", outputPath);
}
return 0;
- } catch(Exception e) {
- LOG.error("Error occurred submitting BulkLoad ", e);
+ } else {
return -1;
}
-
}
private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
[3/3] phoenix git commit: PHOENIX-2538 - CsvBulkLoadTool should
return non-zero exit status if import fails
Posted by ra...@apache.org.
PHOENIX-2538 - CsvBulkLoadTool should return non-zero exit status if import fails
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/55b7adad
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/55b7adad
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/55b7adad
Branch: refs/heads/4.x-HBase-1.0
Commit: 55b7adadda89cbd84fa7fb45067ecf3dbc78f29c
Parents: 1b3a4ca
Author: ravimagham <ra...@bazaarvoice.com>
Authored: Tue Dec 29 10:17:05 2015 -0800
Committer: ravimagham <ra...@bazaarvoice.com>
Committed: Tue Dec 29 10:17:05 2015 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/CsvBulkLoadToolIT.java | 65 +++++++++++++---
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 80 ++++++++++----------
2 files changed, 94 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55b7adad/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 7daacb4..2970d56 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,6 +17,14 @@
*/
package org.apache.phoenix.mapreduce;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -28,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.QueryServices;
@@ -36,18 +45,9 @@ import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
@Category(NeedsOwnMiniClusterTest.class)
public class CsvBulkLoadToolIT {
@@ -321,4 +321,51 @@ public class CsvBulkLoadToolIT {
stmt.close();
}
+ @Test
+ public void testInvalidArguments() {
+ String tableName = "TABLE8";
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ try {
+ csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input4.csv",
+ "--table", tableName,
+ "--zookeeper", zkQuorum });
+ fail(String.format("Table %s not created, hence should fail",tableName));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof IllegalArgumentException);
+ assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+ }
+ }
+
+ @Test
+ public void testAlreadyExistsOutputPath() {
+ String tableName = "TABLE9";
+ String outputPath = "/tmp/output/tabl9";
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+
+ FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ fs.create(new Path(outputPath));
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input9.csv",
+ "--output", outputPath,
+ "--table", tableName,
+ "--zookeeper", zkQuorum });
+
+ fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof FileAlreadyExistsException);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55b7adad/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 1d2594d..4b5d618 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,10 +17,14 @@
*/
package org.apache.phoenix.mapreduce;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -57,14 +61,10 @@ import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
/**
* Base tool for running MapReduce-based ingests of data.
@@ -174,8 +174,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
return loadData(conf, cmdLine);
}
- private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
- InterruptedException, ExecutionException, ClassNotFoundException {
+ private int loadData(Configuration conf, CommandLine cmdLine) throws Exception {
String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
@@ -255,47 +254,44 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
/**
* Submits the jobs to the cluster.
* Loads the HFiles onto the respective tables.
+ * @throws Exception
*/
public int submitJob(final Configuration conf, final String qualifiedTableName,
- final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) {
- try {
- Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
- FileInputFormat.addInputPaths(job, inputPaths);
- FileOutputFormat.setOutputPath(job, outputPath);
+ final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception {
+
+ Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName);
+ FileInputFormat.addInputPaths(job, inputPaths);
+ FileOutputFormat.setOutputPath(job, outputPath);
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapOutputKeyClass(TableRowkeyPair.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.setOutputKeyClass(TableRowkeyPair.class);
- job.setOutputValueClass(KeyValue.class);
- job.setReducerClass(FormatToKeyValueReducer.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapOutputKeyClass(TableRowkeyPair.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setOutputKeyClass(TableRowkeyPair.class);
+ job.setOutputValueClass(KeyValue.class);
+ job.setReducerClass(FormatToKeyValueReducer.class);
- MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+ MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
- final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+ final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+ job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
- // give subclasses their hook
- setupJob(job);
+ // give subclasses their hook
+ setupJob(job);
- LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
- boolean success = job.waitForCompletion(true);
-
- if (success) {
- LOG.info("Loading HFiles from {}", outputPath);
- completebulkload(conf,outputPath,tablesToBeLoaded);
- }
+ LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
+ boolean success = job.waitForCompletion(true);
+ if (success) {
+ LOG.info("Loading HFiles from {}", outputPath);
+ completebulkload(conf,outputPath,tablesToBeLoaded);
LOG.info("Removing output directory {}", outputPath);
- if (!FileSystem.get(conf).delete(outputPath, true)) {
- LOG.error("Removing output directory {} failed", outputPath);
+ if(!FileSystem.get(conf).delete(outputPath, true)) {
+ LOG.error("Failed to delete the output directory {}", outputPath);
}
return 0;
- } catch(Exception e) {
- LOG.error("Error occurred submitting BulkLoad ", e);
+ } else {
return -1;
}
-
}
private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {