You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/12/29 21:25:37 UTC

[3/3] phoenix git commit: PHOENIX-2538 - CsvBulkLoadTool should return non-zero exit status if import fails

PHOENIX-2538 - CsvBulkLoadTool should return non-zero exit status if import fails


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/55b7adad
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/55b7adad
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/55b7adad

Branch: refs/heads/4.x-HBase-1.0
Commit: 55b7adadda89cbd84fa7fb45067ecf3dbc78f29c
Parents: 1b3a4ca
Author: ravimagham <ra...@bazaarvoice.com>
Authored: Tue Dec 29 10:17:05 2015 -0800
Committer: ravimagham <ra...@bazaarvoice.com>
Committed: Tue Dec 29 10:17:05 2015 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkLoadToolIT.java    | 65 +++++++++++++---
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 80 ++++++++++----------
 2 files changed, 94 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/55b7adad/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 7daacb4..2970d56 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,6 +17,14 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -28,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.query.QueryServices;
@@ -36,18 +45,9 @@ import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 @Category(NeedsOwnMiniClusterTest.class)
 public class CsvBulkLoadToolIT {
 
@@ -321,4 +321,51 @@ public class CsvBulkLoadToolIT {
         stmt.close();
     }
     
+    @Test
+    public void testInvalidArguments() {
+        String tableName = "TABLE8";
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        try {
+            csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input4.csv",
+                "--table", tableName,
+                "--zookeeper", zkQuorum });
+            fail(String.format("Table %s not created, hence should fail",tableName));
+        } catch (Exception ex) {
+            assertTrue(ex instanceof IllegalArgumentException); 
+            assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+        }
+    }
+    
+    @Test
+    public void testAlreadyExistsOutputPath() {
+        String tableName = "TABLE9";
+        String outputPath = "/tmp/output/tabl9";
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+                    + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+            
+            FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+            fs.create(new Path(outputPath));
+            FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+            PrintWriter printWriter = new PrintWriter(outputStream);
+            printWriter.println("1,FirstName 1,LastName 1");
+            printWriter.println("2,FirstName 2,LastName 2");
+            printWriter.close();
+            
+            CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+            csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+            csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input9.csv",
+                "--output", outputPath,
+                "--table", tableName,
+                "--zookeeper", zkQuorum });
+            
+            fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+        } catch (Exception ex) {
+            assertTrue(ex instanceof FileAlreadyExistsException); 
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55b7adad/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 1d2594d..4b5d618 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,10 +17,14 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -57,14 +61,10 @@ import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
 
 /**
  * Base tool for running MapReduce-based ingests of data.
@@ -174,8 +174,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         return loadData(conf, cmdLine);
     }
 
-    private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
-            InterruptedException, ExecutionException, ClassNotFoundException {
+    private int loadData(Configuration conf, CommandLine cmdLine) throws Exception {
         String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
         String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
         String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
@@ -255,47 +254,44 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
     /**
      * Submits the jobs to the cluster.
      * Loads the HFiles onto the respective tables.
+     * @throws Exception 
      */
     public int submitJob(final Configuration conf, final String qualifiedTableName,
-        final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) {
-        try {
-            Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
-            FileInputFormat.addInputPaths(job, inputPaths);
-            FileOutputFormat.setOutputPath(job, outputPath);
+        final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception {
+       
+        Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName);
+        FileInputFormat.addInputPaths(job, inputPaths);
+        FileOutputFormat.setOutputPath(job, outputPath);
 
-            job.setInputFormatClass(TextInputFormat.class);
-            job.setMapOutputKeyClass(TableRowkeyPair.class);
-            job.setMapOutputValueClass(KeyValue.class);
-            job.setOutputKeyClass(TableRowkeyPair.class);
-            job.setOutputValueClass(KeyValue.class);
-            job.setReducerClass(FormatToKeyValueReducer.class);
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setMapOutputKeyClass(TableRowkeyPair.class);
+        job.setMapOutputValueClass(KeyValue.class);
+        job.setOutputKeyClass(TableRowkeyPair.class);
+        job.setOutputValueClass(KeyValue.class);
+        job.setReducerClass(FormatToKeyValueReducer.class);
 
-            MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+        MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
 
-            final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
-            job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+        final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+        job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
 
-            // give subclasses their hook
-            setupJob(job);
+        // give subclasses their hook
+        setupJob(job);
 
-            LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
-            boolean success = job.waitForCompletion(true);
-
-            if (success) {
-                LOG.info("Loading HFiles from {}", outputPath);
-                completebulkload(conf,outputPath,tablesToBeLoaded);
-            }
+        LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
+        boolean success = job.waitForCompletion(true);
 
+        if (success) {
+            LOG.info("Loading HFiles from {}", outputPath);
+            completebulkload(conf,outputPath,tablesToBeLoaded);
             LOG.info("Removing output directory {}", outputPath);
-            if (!FileSystem.get(conf).delete(outputPath, true)) {
-                LOG.error("Removing output directory {} failed", outputPath);
+            if(!FileSystem.get(conf).delete(outputPath, true)) {
+                LOG.error("Failed to delete the output directory {}", outputPath);
             }
             return 0;
-        } catch(Exception e) {
-            LOG.error("Error occurred submitting BulkLoad ", e);
+        } else {
             return -1;
         }
-
     }
 
     private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {