You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:30:56 UTC
[02/11] hive git commit: HIVE-13509: HCatalog getSplits should ignore
the partition with invalid path (Chaoyu Tang,
reviewed by Szehon Ho and Mithun Radhakrishnan)
HIVE-13509: HCatalog getSplits should ignore the partition with invalid path (Chaoyu Tang, reviewed by Szehon Ho and Mithun Radhakrishnan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4377c7ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4377c7ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4377c7ff
Branch: refs/heads/llap
Commit: 4377c7ff03c93f0b1c69305a1b8852f579a3130b
Parents: ba864a2
Author: ctang <ct...@cloudera.com>
Authored: Thu Apr 28 22:10:59 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Thu Apr 28 22:10:59 2016 -0400
----------------------------------------------------------------------
HIVE-13509.2.patch | 478 +++++++++++++++++++
.../hive/hcatalog/common/HCatConstants.java | 3 +
.../hcatalog/mapreduce/HCatBaseInputFormat.java | 29 +-
.../hive/hcatalog/pig/TestHCatLoader.java | 55 +++
4 files changed, 559 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/HIVE-13509.2.patch
----------------------------------------------------------------------
diff --git a/HIVE-13509.2.patch b/HIVE-13509.2.patch
new file mode 100644
index 0000000..930b1f7
--- /dev/null
+++ b/HIVE-13509.2.patch
@@ -0,0 +1,478 @@
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+index 6b03fcb..d165e7e 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+@@ -208,4 +208,7 @@ private HCatConstants() { // restrict instantiation
+ */
+ public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+ public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
++
++ public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
++ public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
+ }
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+index adfaf4e..dbbdd61 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+@@ -21,11 +21,11 @@
+
+ import java.io.IOException;
+ import java.util.ArrayList;
++import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.Map;
+ import java.util.HashMap;
+ import java.util.List;
+-
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+@@ -127,7 +127,10 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema)
+ //For each matching partition, call getSplits on the underlying InputFormat
+ for (PartInfo partitionInfo : partitionInfoList) {
+ jobConf = HCatUtil.getJobConfFromContext(jobContext);
+- setInputPath(jobConf, partitionInfo.getLocation());
++ List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
++ if (setInputPath.isEmpty()) {
++ continue;
++ }
+ Map<String, String> jobProperties = partitionInfo.getJobProperties();
+
+ HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+@@ -281,7 +284,7 @@ private static InputJobInfo getJobInfo(Configuration conf)
+ return (InputJobInfo) HCatUtil.deserialize(jobString);
+ }
+
+- private void setInputPath(JobConf jobConf, String location)
++ private List<String> setInputPath(JobConf jobConf, String location)
+ throws IOException {
+
+ // ideally we should just call FileInputFormat.setInputPaths() here - but
+@@ -322,19 +325,33 @@ private void setInputPath(JobConf jobConf, String location)
+ }
+ pathStrings.add(location.substring(pathStart, length));
+
+- Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+ String separator = "";
+ StringBuilder str = new StringBuilder();
+
+- for (Path path : paths) {
++ boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
++ HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
++ Iterator<String> pathIterator = pathStrings.iterator();
++ while (pathIterator.hasNext()) {
++ String pathString = pathIterator.next();
++ if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
++ continue;
++ }
++ Path path = new Path(pathString);
+ FileSystem fs = path.getFileSystem(jobConf);
++ if (ignoreInvalidPath && !fs.exists(path)) {
++ pathIterator.remove();
++ continue;
++ }
+ final String qualifiedPath = fs.makeQualified(path).toString();
+ str.append(separator)
+ .append(StringUtils.escapeString(qualifiedPath));
+ separator = StringUtils.COMMA_STR;
+ }
+
+- jobConf.set("mapred.input.dir", str.toString());
++ if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
++ jobConf.set("mapred.input.dir", str.toString());
++ }
++ return pathStrings;
+ }
+
+ }
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+index 2440cb5..4e23fa2 100644
+--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+@@ -66,6 +66,7 @@
+ import org.apache.pig.data.Tuple;
+ import org.apache.pig.impl.logicalLayer.schema.Schema;
+ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
+ import org.joda.time.DateTime;
+ import org.junit.After;
+ import org.junit.Before;
+@@ -102,6 +103,7 @@
+ add("testReadPartitionedBasic");
+ add("testProjectionsBasic");
+ add("testColumnarStorePushdown2");
++ add("testReadMissingPartitionBasicNeg");
+ }});
+ }};
+
+@@ -438,6 +440,59 @@ public void testReadPartitionedBasic() throws IOException, CommandNeedRetryExcep
+ }
+
+ @Test
++ public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
++ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++ PigServer server = new PigServer(ExecType.LOCAL);
++
++ File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++ if (!removeDirectory(removedPartitionDir)) {
++ System.out.println("Test did not run because its environment could not be set.");
++ return;
++ }
++ driver.run("select * from " + PARTITIONED_TABLE);
++ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++ driver.getResults(valuesReadFromHiveDriver);
++ assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ Schema dumpedWSchema = server.dumpSchema("W");
++ List<FieldSchema> Wfields = dumpedWSchema.getFields();
++ assertEquals(3, Wfields.size());
++ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++ assertTrue(Wfields.get(0).type == DataType.INTEGER);
++ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++ try {
++ Iterator<Tuple> WIter = server.openIterator("W");
++ fail("Should failed in retriving an invalid partition");
++ } catch (IOException ioe) {
++ // expected
++ }
++ }
++
++ private static boolean removeDirectory(File dir) {
++ boolean success = false;
++ if (dir.isDirectory()) {
++ File[] files = dir.listFiles();
++ if (files != null && files.length > 0) {
++ for (File file : files) {
++ success = removeDirectory(file);
++ if (!success) {
++ return false;
++ }
++ }
++ }
++ success = dir.delete();
++ } else {
++ success = dir.delete();
++ }
++ return success;
++ }
++
++ @Test
+ public void testProjectionsBasic() throws IOException {
+ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+new file mode 100644
+index 0000000..41fe79b
+--- /dev/null
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+@@ -0,0 +1,305 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.hive.hcatalog.pig;
++
++import java.io.File;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.io.PrintWriter;
++import java.io.RandomAccessFile;
++import java.sql.Date;
++import java.sql.Timestamp;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Properties;
++import java.util.Set;
++
++import org.apache.commons.io.FileUtils;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FileUtil;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hive.cli.CliSessionState;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.ql.CommandNeedRetryException;
++import org.apache.hadoop.hive.ql.Driver;
++import org.apache.hadoop.hive.ql.WindowsPathUtil;
++import org.apache.hadoop.hive.ql.io.IOConstants;
++import org.apache.hadoop.hive.ql.io.StorageFormats;
++import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.util.Shell;
++import org.apache.hive.hcatalog.HcatTestUtils;
++import org.apache.hive.hcatalog.common.HCatUtil;
++import org.apache.hive.hcatalog.common.HCatConstants;
++import org.apache.hive.hcatalog.data.Pair;
++import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
++import org.apache.pig.ExecType;
++import org.apache.pig.PigRunner;
++import org.apache.pig.PigServer;
++import org.apache.pig.ResourceStatistics;
++import org.apache.pig.tools.pigstats.OutputStats;
++import org.apache.pig.tools.pigstats.PigStats;
++import org.apache.pig.data.DataType;
++import org.apache.pig.data.Tuple;
++import org.apache.pig.impl.logicalLayer.schema.Schema;
++import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
++import org.joda.time.DateTime;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import static org.junit.Assert.*;
++import static org.junit.Assume.assumeTrue;
++
++@RunWith(Parameterized.class)
++public class TestHCatLoaderWithProps {
++ private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderWithProps.class);
++ private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
++ File.separator + TestHCatLoaderWithProps.class.getCanonicalName() + "-" + System.currentTimeMillis());
++ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
++ private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
++
++ private static final String BASIC_TABLE = "junit_unparted_basic";
++ private static final String PARTITIONED_TABLE = "junit_parted_basic";
++
++ private Driver driver;
++ private Map<Integer, Pair<Integer, String>> basicInputData;
++
++ private static final Map<String, Set<String>> DISABLED_STORAGE_FORMATS =
++ new HashMap<String, Set<String>>() {{
++ put(IOConstants.PARQUETFILE, new HashSet<String>() {{
++ add("testReadMissingPartitionBasic");
++ }});
++ }};
++
++ private final String storageFormat;
++
++ @Parameterized.Parameters
++ public static Collection<Object[]> generateParameters() {
++ return StorageFormats.names();
++ }
++
++ public TestHCatLoaderWithProps(String storageFormat) {
++ this.storageFormat = storageFormat;
++ }
++
++ private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
++ dropTable(tablename, driver);
++ }
++
++ static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException {
++ driver.run("drop table if exists " + tablename);
++ }
++
++ private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
++ createTable(tablename, schema, partitionedBy, driver, storageFormat);
++ }
++
++ static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat)
++ throws IOException, CommandNeedRetryException {
++ String createTable;
++ createTable = "create table " + tablename + "(" + schema + ") ";
++ if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
++ createTable = createTable + "partitioned by (" + partitionedBy + ") ";
++ }
++ createTable = createTable + "stored as " +storageFormat;
++ executeStatementOnDriver(createTable, driver);
++ }
++
++ private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
++ createTable(tablename, schema, null);
++ }
++
++ /**
++ * Execute Hive CLI statement
++ * @param cmd arbitrary statement to execute
++ */
++ static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
++ LOG.debug("Executing: " + cmd);
++ CommandProcessorResponse cpr = driver.run(cmd);
++ if(cpr.getResponseCode() != 0) {
++ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage());
++ }
++ }
++
++ @Before
++ public void setup() throws Exception {
++ File f = new File(TEST_WAREHOUSE_DIR);
++ if (f.exists()) {
++ FileUtil.fullyDelete(f);
++ }
++ if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
++ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
++ }
++
++ HiveConf hiveConf = new HiveConf(this.getClass());
++ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
++ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
++ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
++ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
++ hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
++
++ if (Shell.WINDOWS) {
++ WindowsPathUtil.convertPathsFromWindowsToHdfs(hiveConf);
++ }
++
++ driver = new Driver(hiveConf);
++ SessionState.start(new CliSessionState(hiveConf));
++
++ createTable(BASIC_TABLE, "a int, b string");
++ createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
++
++ int LOOP_SIZE = 3;
++ String[] input = new String[LOOP_SIZE * LOOP_SIZE];
++ basicInputData = new HashMap<Integer, Pair<Integer, String>>();
++ int k = 0;
++ for (int i = 1; i <= LOOP_SIZE; i++) {
++ String si = i + "";
++ for (int j = 1; j <= LOOP_SIZE; j++) {
++ String sj = "S" + j + "S";
++ input[k] = si + "\t" + sj;
++ basicInputData.put(k, new Pair<Integer, String>(i, sj));
++ k++;
++ }
++ }
++ HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
++
++ PigServer server = new PigServer(ExecType.LOCAL);
++ server.setBatchOn();
++ int i = 0;
++ server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i);
++
++ server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
++ server.registerQuery("B = foreach A generate a,b;", ++i);
++ server.registerQuery("B2 = filter B by a < 2;", ++i);
++ server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i);
++
++ server.registerQuery("C = foreach A generate a,b;", ++i);
++ server.registerQuery("C2 = filter C by a >= 2;", ++i);
++ server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i);
++
++ server.executeBatch();
++ }
++
++ @After
++ public void tearDown() throws Exception {
++ try {
++ if (driver != null) {
++ dropTable(BASIC_TABLE);
++ dropTable(PARTITIONED_TABLE);
++ }
++ } finally {
++ FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
++ }
++ }
++
++ @Test
++ public void testReadMissingPartitionBasic() throws IOException, CommandNeedRetryException {
++ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++ Properties pigProperties = PropertiesUtil.loadDefaultProperties();
++ pigProperties.setProperty("hcat.input.ignore.invalid.path", "true");
++ PigServer server = new PigServer(ExecType.LOCAL, pigProperties);
++
++ File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++ if (!removeDirectory(removedPartitionDir)) {
++ System.out.println("Test did not run because its environment could not be set.");
++ return;
++ }
++ driver.run("select * from " + PARTITIONED_TABLE);
++ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++ driver.getResults(valuesReadFromHiveDriver);
++ assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ Schema dumpedWSchema = server.dumpSchema("W");
++ List<FieldSchema> Wfields = dumpedWSchema.getFields();
++ assertEquals(3, Wfields.size());
++ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++ assertTrue(Wfields.get(0).type == DataType.INTEGER);
++ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++ Iterator<Tuple> WIter = server.openIterator("W");
++ Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
++ while (WIter.hasNext()) {
++ Tuple t = WIter.next();
++ assertTrue(t.size() == 3);
++ assertNotNull(t.get(0));
++ assertNotNull(t.get(1));
++ assertNotNull(t.get(2));
++ assertTrue(t.get(0).getClass() == Integer.class);
++ assertTrue(t.get(1).getClass() == String.class);
++ assertTrue(t.get(2).getClass() == String.class);
++ valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
++ // the returned partition value is always 1
++ assertEquals("1", t.get(2));
++ }
++ assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
++
++ server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ server.registerQuery("P1filter = filter P1 by bkt == '0';");
++ Iterator<Tuple> P1Iter = server.openIterator("P1filter");
++ assertFalse(P1Iter.hasNext());
++
++ server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ server.registerQuery("P2filter = filter P2 by bkt == '1';");
++ Iterator<Tuple> P2Iter = server.openIterator("P2filter");
++ int count2 = 0;
++ while (P2Iter.hasNext()) {
++ Tuple t = P2Iter.next();
++ assertEquals("1", t.get(2));
++ assertTrue(((Integer) t.get(0)) > 1);
++ count2++;
++ }
++ assertEquals(6, count2);
++ }
++
++ private static boolean removeDirectory(File dir) {
++ boolean success = false;
++ if (dir.isDirectory()) {
++ File[] files = dir.listFiles();
++ if (files != null && files.length > 0) {
++ for (File file : files) {
++ success = removeDirectory(file);
++ if (!success) {
++ return false;
++ }
++ }
++ }
++ success = dir.delete();
++ } else {
++ success = dir.delete();
++ }
++ return success;
++ }
++}
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
index 6b03fcb..d165e7e 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
@@ -208,4 +208,7 @@ public final class HCatConstants {
*/
public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
+
+ public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
+ public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
index adfaf4e..dbbdd61 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.hive.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -127,7 +127,10 @@ public abstract class HCatBaseInputFormat
//For each matching partition, call getSplits on the underlying InputFormat
for (PartInfo partitionInfo : partitionInfoList) {
jobConf = HCatUtil.getJobConfFromContext(jobContext);
- setInputPath(jobConf, partitionInfo.getLocation());
+ List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
+ if (setInputPath.isEmpty()) {
+ continue;
+ }
Map<String, String> jobProperties = partitionInfo.getJobProperties();
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
@@ -281,7 +284,7 @@ public abstract class HCatBaseInputFormat
return (InputJobInfo) HCatUtil.deserialize(jobString);
}
- private void setInputPath(JobConf jobConf, String location)
+ private List<String> setInputPath(JobConf jobConf, String location)
throws IOException {
// ideally we should just call FileInputFormat.setInputPaths() here - but
@@ -322,19 +325,33 @@ public abstract class HCatBaseInputFormat
}
pathStrings.add(location.substring(pathStart, length));
- Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
String separator = "";
StringBuilder str = new StringBuilder();
- for (Path path : paths) {
+ boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
+ HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
+ Iterator<String> pathIterator = pathStrings.iterator();
+ while (pathIterator.hasNext()) {
+ String pathString = pathIterator.next();
+ if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
+ continue;
+ }
+ Path path = new Path(pathString);
FileSystem fs = path.getFileSystem(jobConf);
+ if (ignoreInvalidPath && !fs.exists(path)) {
+ pathIterator.remove();
+ continue;
+ }
final String qualifiedPath = fs.makeQualified(path).toString();
str.append(separator)
.append(StringUtils.escapeString(qualifiedPath));
separator = StringUtils.COMMA_STR;
}
- jobConf.set("mapred.input.dir", str.toString());
+ if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
+ jobConf.set("mapred.input.dir", str.toString());
+ }
+ return pathStrings;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
----------------------------------------------------------------------
diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
index 2440cb5..4e23fa2 100644
--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
@@ -66,6 +66,7 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.PropertiesUtil;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
@@ -102,6 +103,7 @@ public class TestHCatLoader {
add("testReadPartitionedBasic");
add("testProjectionsBasic");
add("testColumnarStorePushdown2");
+ add("testReadMissingPartitionBasicNeg");
}});
}};
@@ -438,6 +440,59 @@ public class TestHCatLoader {
}
@Test
+ public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
+ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
+ if (!removeDirectory(removedPartitionDir)) {
+ System.out.println("Test did not run because its environment could not be set.");
+ return;
+ }
+ driver.run("select * from " + PARTITIONED_TABLE);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ assertTrue(valuesReadFromHiveDriver.size() == 6);
+
+ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
+ Schema dumpedWSchema = server.dumpSchema("W");
+ List<FieldSchema> Wfields = dumpedWSchema.getFields();
+ assertEquals(3, Wfields.size());
+ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+ assertTrue(Wfields.get(0).type == DataType.INTEGER);
+ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+ try {
+ Iterator<Tuple> WIter = server.openIterator("W");
+ fail("Should failed in retriving an invalid partition");
+ } catch (IOException ioe) {
+ // expected
+ }
+ }
+
+ private static boolean removeDirectory(File dir) {
+ boolean success = false;
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ if (files != null && files.length > 0) {
+ for (File file : files) {
+ success = removeDirectory(file);
+ if (!success) {
+ return false;
+ }
+ }
+ }
+ success = dir.delete();
+ } else {
+ success = dir.delete();
+ }
+ return success;
+ }
+
+ @Test
public void testProjectionsBasic() throws IOException {
assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));