You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:30:56 UTC

[02/11] hive git commit: HIVE-13509: HCatalog getSplits should ignore the partition with invalid path (Chaoyu Tang, reviewed by Szehon Ho and Mithun Radhakrishnan)

HIVE-13509: HCatalog getSplits should ignore the partition with invalid path (Chaoyu Tang, reviewed by Szehon Ho and Mithun Radhakrishnan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4377c7ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4377c7ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4377c7ff

Branch: refs/heads/llap
Commit: 4377c7ff03c93f0b1c69305a1b8852f579a3130b
Parents: ba864a2
Author: ctang <ct...@cloudera.com>
Authored: Thu Apr 28 22:10:59 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Thu Apr 28 22:10:59 2016 -0400

----------------------------------------------------------------------
 HIVE-13509.2.patch                              | 478 +++++++++++++++++++
 .../hive/hcatalog/common/HCatConstants.java     |   3 +
 .../hcatalog/mapreduce/HCatBaseInputFormat.java |  29 +-
 .../hive/hcatalog/pig/TestHCatLoader.java       |  55 +++
 4 files changed, 559 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/HIVE-13509.2.patch
----------------------------------------------------------------------
diff --git a/HIVE-13509.2.patch b/HIVE-13509.2.patch
new file mode 100644
index 0000000..930b1f7
--- /dev/null
+++ b/HIVE-13509.2.patch
@@ -0,0 +1,478 @@
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+index 6b03fcb..d165e7e 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+@@ -208,4 +208,7 @@ private HCatConstants() { // restrict instantiation
+    */
+   public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+   public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
++
++  public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
++  public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
+ }
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+index adfaf4e..dbbdd61 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+@@ -21,11 +21,11 @@
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
++import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.Map;
+ import java.util.HashMap;
+ import java.util.List;
+-
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+@@ -127,7 +127,10 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema)
+     //For each matching partition, call getSplits on the underlying InputFormat
+     for (PartInfo partitionInfo : partitionInfoList) {
+       jobConf = HCatUtil.getJobConfFromContext(jobContext);
+-      setInputPath(jobConf, partitionInfo.getLocation());
++      List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
++      if (setInputPath.isEmpty()) {
++        continue;
++      }
+       Map<String, String> jobProperties = partitionInfo.getJobProperties();
+ 
+       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+@@ -281,7 +284,7 @@ private static InputJobInfo getJobInfo(Configuration conf)
+     return (InputJobInfo) HCatUtil.deserialize(jobString);
+   }
+ 
+-  private void setInputPath(JobConf jobConf, String location)
++  private List<String> setInputPath(JobConf jobConf, String location)
+     throws IOException {
+ 
+     // ideally we should just call FileInputFormat.setInputPaths() here - but
+@@ -322,19 +325,33 @@ private void setInputPath(JobConf jobConf, String location)
+     }
+     pathStrings.add(location.substring(pathStart, length));
+ 
+-    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+     String separator = "";
+     StringBuilder str = new StringBuilder();
+ 
+-    for (Path path : paths) {
++    boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
++        HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
++    Iterator<String> pathIterator = pathStrings.iterator();
++    while (pathIterator.hasNext()) {
++      String pathString = pathIterator.next();
++      if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
++        continue;
++      }
++      Path path = new Path(pathString);
+       FileSystem fs = path.getFileSystem(jobConf);
++      if (ignoreInvalidPath && !fs.exists(path)) {
++        pathIterator.remove();
++        continue;
++      }
+       final String qualifiedPath = fs.makeQualified(path).toString();
+       str.append(separator)
+         .append(StringUtils.escapeString(qualifiedPath));
+       separator = StringUtils.COMMA_STR;
+     }
+ 
+-    jobConf.set("mapred.input.dir", str.toString());
++    if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
++      jobConf.set("mapred.input.dir", str.toString());
++    }
++    return pathStrings;
+   }
+ 
+ }
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+index 2440cb5..4e23fa2 100644
+--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+@@ -66,6 +66,7 @@
+ import org.apache.pig.data.Tuple;
+ import org.apache.pig.impl.logicalLayer.schema.Schema;
+ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
+ import org.joda.time.DateTime;
+ import org.junit.After;
+ import org.junit.Before;
+@@ -102,6 +103,7 @@
+           add("testReadPartitionedBasic");
+           add("testProjectionsBasic");
+           add("testColumnarStorePushdown2");
++          add("testReadMissingPartitionBasicNeg");
+         }});
+       }};
+ 
+@@ -438,6 +440,59 @@ public void testReadPartitionedBasic() throws IOException, CommandNeedRetryExcep
+   }
+ 
+   @Test
++  public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
++    assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++    PigServer server = new PigServer(ExecType.LOCAL);
++
++    File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++    if (!removeDirectory(removedPartitionDir)) {
++      System.out.println("Test did not run because its environment could not be set.");
++      return;
++    }
++    driver.run("select * from " + PARTITIONED_TABLE);
++    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++    driver.getResults(valuesReadFromHiveDriver);
++    assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++    server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    Schema dumpedWSchema = server.dumpSchema("W");
++    List<FieldSchema> Wfields = dumpedWSchema.getFields();
++    assertEquals(3, Wfields.size());
++    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++    assertTrue(Wfields.get(0).type == DataType.INTEGER);
++    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++    try {
++      Iterator<Tuple> WIter = server.openIterator("W");
++      fail("Should failed in retriving an invalid partition");
++    } catch (IOException ioe) {
++      // expected
++    }
++  }
++
++  private static boolean removeDirectory(File dir) {
++    boolean success = false;
++    if (dir.isDirectory()) {
++      File[] files = dir.listFiles();
++      if (files != null && files.length > 0) {
++        for (File file : files) {
++          success = removeDirectory(file);
++          if (!success) {
++            return false;
++          }
++        }
++      }
++      success = dir.delete();
++    } else {
++        success = dir.delete();
++    }
++    return success;
++  }
++
++  @Test
+   public void testProjectionsBasic() throws IOException {
+     assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+ 
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+new file mode 100644
+index 0000000..41fe79b
+--- /dev/null
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+@@ -0,0 +1,305 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.hive.hcatalog.pig;
++
++import java.io.File;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.io.PrintWriter;
++import java.io.RandomAccessFile;
++import java.sql.Date;
++import java.sql.Timestamp;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Properties;
++import java.util.Set;
++
++import org.apache.commons.io.FileUtils;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FileUtil;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hive.cli.CliSessionState;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.ql.CommandNeedRetryException;
++import org.apache.hadoop.hive.ql.Driver;
++import org.apache.hadoop.hive.ql.WindowsPathUtil;
++import org.apache.hadoop.hive.ql.io.IOConstants;
++import org.apache.hadoop.hive.ql.io.StorageFormats;
++import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.util.Shell;
++import org.apache.hive.hcatalog.HcatTestUtils;
++import org.apache.hive.hcatalog.common.HCatUtil;
++import org.apache.hive.hcatalog.common.HCatConstants;
++import org.apache.hive.hcatalog.data.Pair;
++import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
++import org.apache.pig.ExecType;
++import org.apache.pig.PigRunner;
++import org.apache.pig.PigServer;
++import org.apache.pig.ResourceStatistics;
++import org.apache.pig.tools.pigstats.OutputStats;
++import org.apache.pig.tools.pigstats.PigStats;
++import org.apache.pig.data.DataType;
++import org.apache.pig.data.Tuple;
++import org.apache.pig.impl.logicalLayer.schema.Schema;
++import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
++import org.joda.time.DateTime;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import static org.junit.Assert.*;
++import static org.junit.Assume.assumeTrue;
++
++@RunWith(Parameterized.class)
++public class TestHCatLoaderWithProps {
++  private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderWithProps.class);
++  private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
++      File.separator + TestHCatLoaderWithProps.class.getCanonicalName() + "-" + System.currentTimeMillis());
++  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
++  private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
++
++  private static final String BASIC_TABLE = "junit_unparted_basic";
++  private static final String PARTITIONED_TABLE = "junit_parted_basic";
++
++  private Driver driver;
++  private Map<Integer, Pair<Integer, String>> basicInputData;
++
++  private static final Map<String, Set<String>> DISABLED_STORAGE_FORMATS =
++      new HashMap<String, Set<String>>() {{
++        put(IOConstants.PARQUETFILE, new HashSet<String>() {{
++          add("testReadMissingPartitionBasic");
++        }});
++      }};
++
++  private final String storageFormat;
++
++  @Parameterized.Parameters
++  public static Collection<Object[]> generateParameters() {
++    return StorageFormats.names();
++  }
++
++  public TestHCatLoaderWithProps(String storageFormat) {
++    this.storageFormat = storageFormat;
++  }
++
++  private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
++    dropTable(tablename, driver);
++  }
++
++  static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException {
++    driver.run("drop table if exists " + tablename);
++  }
++
++  private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
++    createTable(tablename, schema, partitionedBy, driver, storageFormat);
++  }
++
++  static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat)
++      throws IOException, CommandNeedRetryException {
++    String createTable;
++    createTable = "create table " + tablename + "(" + schema + ") ";
++    if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
++      createTable = createTable + "partitioned by (" + partitionedBy + ") ";
++    }
++    createTable = createTable + "stored as " +storageFormat;
++    executeStatementOnDriver(createTable, driver);
++  }
++
++  private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
++    createTable(tablename, schema, null);
++  }
++
++  /**
++   * Execute Hive CLI statement
++   * @param cmd arbitrary statement to execute
++   */
++  static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
++    LOG.debug("Executing: " + cmd);
++    CommandProcessorResponse cpr = driver.run(cmd);
++    if(cpr.getResponseCode() != 0) {
++      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage());
++    }
++  }
++
++  @Before
++  public void setup() throws Exception {
++    File f = new File(TEST_WAREHOUSE_DIR);
++    if (f.exists()) {
++      FileUtil.fullyDelete(f);
++    }
++    if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
++      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
++    }
++
++    HiveConf hiveConf = new HiveConf(this.getClass());
++    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
++    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
++    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
++    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
++    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
++
++    if (Shell.WINDOWS) {
++      WindowsPathUtil.convertPathsFromWindowsToHdfs(hiveConf);
++    }
++
++    driver = new Driver(hiveConf);
++    SessionState.start(new CliSessionState(hiveConf));
++
++    createTable(BASIC_TABLE, "a int, b string");
++    createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
++
++    int LOOP_SIZE = 3;
++    String[] input = new String[LOOP_SIZE * LOOP_SIZE];
++    basicInputData = new HashMap<Integer, Pair<Integer, String>>();
++    int k = 0;
++    for (int i = 1; i <= LOOP_SIZE; i++) {
++      String si = i + "";
++      for (int j = 1; j <= LOOP_SIZE; j++) {
++        String sj = "S" + j + "S";
++        input[k] = si + "\t" + sj;
++        basicInputData.put(k, new Pair<Integer, String>(i, sj));
++        k++;
++      }
++    }
++    HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
++
++    PigServer server = new PigServer(ExecType.LOCAL);
++    server.setBatchOn();
++    int i = 0;
++    server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i);
++
++    server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
++    server.registerQuery("B = foreach A generate a,b;", ++i);
++    server.registerQuery("B2 = filter B by a < 2;", ++i);
++    server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i);
++
++    server.registerQuery("C = foreach A generate a,b;", ++i);
++    server.registerQuery("C2 = filter C by a >= 2;", ++i);
++    server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i);
++
++    server.executeBatch();
++  }
++
++  @After
++  public void tearDown() throws Exception {
++    try {
++      if (driver != null) {
++        dropTable(BASIC_TABLE);
++        dropTable(PARTITIONED_TABLE);
++      }
++    } finally {
++      FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
++    }
++  }
++
++  @Test
++  public void testReadMissingPartitionBasic() throws IOException, CommandNeedRetryException {
++    assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++    Properties pigProperties = PropertiesUtil.loadDefaultProperties();
++    pigProperties.setProperty("hcat.input.ignore.invalid.path", "true");
++    PigServer server = new PigServer(ExecType.LOCAL, pigProperties);
++
++    File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++    if (!removeDirectory(removedPartitionDir)) {
++      System.out.println("Test did not run because its environment could not be set.");
++      return;
++    }
++    driver.run("select * from " + PARTITIONED_TABLE);
++    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++    driver.getResults(valuesReadFromHiveDriver);
++    assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++    server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    Schema dumpedWSchema = server.dumpSchema("W");
++    List<FieldSchema> Wfields = dumpedWSchema.getFields();
++    assertEquals(3, Wfields.size());
++    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++    assertTrue(Wfields.get(0).type == DataType.INTEGER);
++    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++    Iterator<Tuple> WIter = server.openIterator("W");
++    Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
++    while (WIter.hasNext()) {
++      Tuple t = WIter.next();
++      assertTrue(t.size() == 3);
++      assertNotNull(t.get(0));
++      assertNotNull(t.get(1));
++      assertNotNull(t.get(2));
++      assertTrue(t.get(0).getClass() == Integer.class);
++      assertTrue(t.get(1).getClass() == String.class);
++      assertTrue(t.get(2).getClass() == String.class);
++      valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
++      // the returned partition value is always 1
++      assertEquals("1", t.get(2));
++    }
++    assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
++
++    server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    server.registerQuery("P1filter = filter P1 by bkt == '0';");
++    Iterator<Tuple> P1Iter = server.openIterator("P1filter");
++    assertFalse(P1Iter.hasNext());
++
++    server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    server.registerQuery("P2filter = filter P2 by bkt == '1';");
++    Iterator<Tuple> P2Iter = server.openIterator("P2filter");
++    int count2 = 0;
++    while (P2Iter.hasNext()) {
++      Tuple t = P2Iter.next();
++      assertEquals("1", t.get(2));
++      assertTrue(((Integer) t.get(0)) > 1);
++      count2++;
++    }
++    assertEquals(6, count2);
++  }
++
++  private static boolean removeDirectory(File dir) {
++    boolean success = false;
++    if (dir.isDirectory()) {
++      File[] files = dir.listFiles();
++      if (files != null && files.length > 0) {
++        for (File file : files) {
++          success = removeDirectory(file);
++          if (!success) {
++            return false;
++          }
++        }
++      }
++      success = dir.delete();
++    } else {
++        success = dir.delete();
++    }
++    return success;
++  }
++}

http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
index 6b03fcb..d165e7e 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
@@ -208,4 +208,7 @@ public final class HCatConstants {
    */
   public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
   public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
+
+  public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
+  public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
index adfaf4e..dbbdd61 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.hive.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -127,7 +127,10 @@ public abstract class HCatBaseInputFormat
     //For each matching partition, call getSplits on the underlying InputFormat
     for (PartInfo partitionInfo : partitionInfoList) {
       jobConf = HCatUtil.getJobConfFromContext(jobContext);
-      setInputPath(jobConf, partitionInfo.getLocation());
+      List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
+      if (setInputPath.isEmpty()) {
+        continue;
+      }
       Map<String, String> jobProperties = partitionInfo.getJobProperties();
 
       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
@@ -281,7 +284,7 @@ public abstract class HCatBaseInputFormat
     return (InputJobInfo) HCatUtil.deserialize(jobString);
   }
 
-  private void setInputPath(JobConf jobConf, String location)
+  private List<String> setInputPath(JobConf jobConf, String location)
     throws IOException {
 
     // ideally we should just call FileInputFormat.setInputPaths() here - but
@@ -322,19 +325,33 @@ public abstract class HCatBaseInputFormat
     }
     pathStrings.add(location.substring(pathStart, length));
 
-    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
     String separator = "";
     StringBuilder str = new StringBuilder();
 
-    for (Path path : paths) {
+    boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
+        HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
+    Iterator<String> pathIterator = pathStrings.iterator();
+    while (pathIterator.hasNext()) {
+      String pathString = pathIterator.next();
+      if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
+        continue;
+      }
+      Path path = new Path(pathString);
       FileSystem fs = path.getFileSystem(jobConf);
+      if (ignoreInvalidPath && !fs.exists(path)) {
+        pathIterator.remove();
+        continue;
+      }
       final String qualifiedPath = fs.makeQualified(path).toString();
       str.append(separator)
         .append(StringUtils.escapeString(qualifiedPath));
       separator = StringUtils.COMMA_STR;
     }
 
-    jobConf.set("mapred.input.dir", str.toString());
+    if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
+      jobConf.set("mapred.input.dir", str.toString());
+    }
+    return pathStrings;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
----------------------------------------------------------------------
diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
index 2440cb5..4e23fa2 100644
--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
@@ -66,6 +66,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.PropertiesUtil;
 import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Before;
@@ -102,6 +103,7 @@ public class TestHCatLoader {
           add("testReadPartitionedBasic");
           add("testProjectionsBasic");
           add("testColumnarStorePushdown2");
+          add("testReadMissingPartitionBasicNeg");
         }});
       }};
 
@@ -438,6 +440,59 @@ public class TestHCatLoader {
   }
 
   @Test
+  public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
+    assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+    PigServer server = new PigServer(ExecType.LOCAL);
+
+    File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
+    if (!removeDirectory(removedPartitionDir)) {
+      System.out.println("Test did not run because its environment could not be set.");
+      return;
+    }
+    driver.run("select * from " + PARTITIONED_TABLE);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    assertTrue(valuesReadFromHiveDriver.size() == 6);
+
+    server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
+    Schema dumpedWSchema = server.dumpSchema("W");
+    List<FieldSchema> Wfields = dumpedWSchema.getFields();
+    assertEquals(3, Wfields.size());
+    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+    assertTrue(Wfields.get(0).type == DataType.INTEGER);
+    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+    try {
+      Iterator<Tuple> WIter = server.openIterator("W");
+      fail("Should failed in retriving an invalid partition");
+    } catch (IOException ioe) {
+      // expected
+    }
+  }
+
+  private static boolean removeDirectory(File dir) {
+    boolean success = false;
+    if (dir.isDirectory()) {
+      File[] files = dir.listFiles();
+      if (files != null && files.length > 0) {
+        for (File file : files) {
+          success = removeDirectory(file);
+          if (!success) {
+            return false;
+          }
+        }
+      }
+      success = dir.delete();
+    } else {
+        success = dir.delete();
+    }
+    return success;
+  }
+
+  @Test
   public void testProjectionsBasic() throws IOException {
     assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));