You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/13 17:09:02 UTC

svn commit: r1384405 - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/mapreduce/

Author: travis
Date: Thu Sep 13 17:09:01 2012
New Revision: 1384405

URL: http://svn.apache.org/viewvc?rev=1384405&view=rev
Log:
HCATALOG-500 HCatStorer should honor user-specified path for external tables

Added:
    incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
    incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Sep 13 17:09:01 2012
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-500 HCatStorer should honor user-specified path for external tables (pengfeng via traviscrawford)
+
   HCAT-493 Convert classes with 2 space indentation to 4 space indentation for consistent style (amalakar via traviscrawford)
 
   HCAT-489 HCatalog style cleanups and readd javac debug option (traviscrawford)

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java Thu Sep 13 17:09:01 2012
@@ -116,6 +116,10 @@ public class HCatStorer extends HCatBase
                     "Schema for data cannot be determined.",
                     PigHCatUtil.PIG_EXCEPTION_CODE);
             }
+            String externalLocation = (String) udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION);
+            if (externalLocation != null) {
+                outputJobInfo.setLocation(externalLocation);
+            }
             try {
                 HCatOutputFormat.setOutput(job, outputJobInfo);
             } catch (HCatException he) {

Added: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java?rev=1384405&view=auto
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java (added)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java Thu Sep 13 17:09:01 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to test the HCAT_PIG_STORER_EXTERNAL_LOCATION property used in HCatStorer.
+ * When this property is set, HCatStorer writes the output to the location it specifies. Since
+ * the property can only be set in the UDFContext, we need this simpler wrapper to do three things:
+ * <ol>
+ * <li> save the external dir specified in the Pig script </li>
+ * <li> set the same UDFContext signature as HCatStorer </li>
+ * <li> before {@link HCatStorer#setStoreLocation(String, Job)}, set the external dir in the UDFContext.</li>
+ * </ol>
+ */
+public class HCatStorerWrapper extends HCatStorer {
+
+    private String sign;
+    private String externalDir;
+
+    public HCatStorerWrapper(String partSpecs, String schema, String externalDir) throws Exception {
+	super(partSpecs, schema);
+	this.externalDir = externalDir;
+    }
+
+    public HCatStorerWrapper(String partSpecs, String externalDir) throws Exception {
+	super(partSpecs);
+	this.externalDir = externalDir;
+    }
+
+    public HCatStorerWrapper(String externalDir) throws Exception{
+	super();
+	this.externalDir = externalDir;
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+	Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+		this.getClass(), new String[] { sign });
+	udfProps.setProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION, externalDir);
+	super.setStoreLocation(location, job);
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+	sign = signature;
+	super.setStoreFuncUDFContextSignature(signature);
+    }
+}

Added: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java?rev=1384405&view=auto
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java (added)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java Thu Sep 13 17:09:01 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test checks the {@link HCatConstants#HCAT_PIG_STORER_EXTERNAL_LOCATION} that we can set in the
+ * UDFContext of {@link HCatStorer} so that it writes to the specified external location.
+ *
+ * Since {@link HCatStorer} does not allow extra parameters in the constructor, we use {@link HCatStorerWrapper}
+ * that always treats the last parameter as the external path.
+ */
+public class TestHCatStorerWrapper extends HCatBaseTest {
+
+    private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+
+    @Test
+    public void testStoreExternalTableWithExternalDir() throws IOException, CommandNeedRetryException{
+
+	File tmpExternalDir = new File(SystemUtils.getJavaIoTmpDir(), UUID.randomUUID().toString());
+	tmpExternalDir.deleteOnExit();
+
+	String part_val = "100";
+
+	driver.run("drop table junit_external");
+	String createTable = "create external table junit_external(a int, b string) partitioned by (c string) stored as RCFILE";
+	Assert.assertEquals(0, driver.run(createTable).getResponseCode());
+
+	int LOOP_SIZE = 3;
+	String[] inputData = new String[LOOP_SIZE*LOOP_SIZE];
+	int k = 0;
+	for(int i = 1; i <= LOOP_SIZE; i++) {
+	    String si = i + "";
+	    for(int j=1;j<=LOOP_SIZE;j++) {
+		inputData[k++] = si + "\t"+j;
+	    }
+	}
+	HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	logAndRegister(server, "A = load '"+INPUT_FILE_NAME+"' as (a:int, b:chararray);");
+	logAndRegister(server, "store A into 'default.junit_external' using " + HCatStorerWrapper.class.getName()
+		+ "('c=" + part_val + "','" + tmpExternalDir.getAbsolutePath() + "');");
+	server.executeBatch();
+
+	Assert.assertTrue(tmpExternalDir.exists());
+	Assert.assertTrue(new File(tmpExternalDir.getAbsoluteFile() + "/" + "part-m-00000").exists());
+
+	driver.run("select * from junit_external");
+	ArrayList<String> res = new ArrayList<String>();
+	driver.getResults(res);
+	driver.run("drop table junit_external");
+	Iterator<String> itr = res.iterator();
+	for(int i = 1; i <= LOOP_SIZE; i++) {
+	    String si = i + "";
+	    for(int j=1;j<=LOOP_SIZE;j++) {
+		Assert.assertEquals( si + "\t" + j + "\t" + part_val,itr.next());
+	    }
+	}
+	Assert.assertFalse(itr.hasNext());
+
+    }
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Thu Sep 13 17:09:01 2012
@@ -43,6 +43,14 @@ public final class HCatConstants {
     public static final String HCAT_PIG_INNER_FIELD_NAME = "hcat.pig.inner.field.name";
     public static final String HCAT_PIG_INNER_FIELD_NAME_DEFAULT = "innerfield";
 
+    /**
+     * {@value} (default: null)
+     * When the property is set in the UDFContext of the {@link HCatStorer}, {@link HCatStorer} writes
+     * to the location it specifies instead of the default HCatalog location format. An example can be found
+     * in @{link HCatStorerWrapper}.
+     */
+    public static final String HCAT_PIG_STORER_EXTERNAL_LOCATION = HCAT_PIG_STORER + ".external.location";
+
     //The keys used to store info into the job Configuration
     public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Thu Sep 13 17:09:01 2012
@@ -288,7 +288,7 @@ class FileOutputCommitterContainer exten
             if (!dynamicPartitioningUsed) {
                 partitionsToAdd.add(
                     constructPartition(
-                        context,
+                        context, jobInfo,
                         tblPath.toString(), jobInfo.getPartitionValues()
                         , jobInfo.getOutputSchema(), getStorerParameterMap(storer)
                         , table, fs
@@ -297,7 +297,7 @@ class FileOutputCommitterContainer exten
                 for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) {
                     partitionsToAdd.add(
                         constructPartition(
-                            context,
+                            context, jobInfo,
                             getPartitionRootLocation(entry.getKey(), entry.getValue().size()), entry.getValue()
                             , jobInfo.getOutputSchema(), getStorerParameterMap(storer)
                             , table, fs
@@ -402,6 +402,8 @@ class FileOutputCommitterContainer exten
 
     /**
      * Generate partition metadata object to be used to add to metadata.
+     * @param context The job context.
+     * @param jobInfo The OutputJobInfo.
      * @param partLocnRoot The table-equivalent location root of the partition
      *                       (temporary dir if dynamic partition, table dir if static)
      * @param partKVs The keyvalue pairs that form the partition
@@ -416,7 +418,7 @@ class FileOutputCommitterContainer exten
      */
 
     private Partition constructPartition(
-        JobContext context,
+        JobContext context, OutputJobInfo jobInfo,
         String partLocnRoot, Map<String, String> partKVs,
         HCatSchema outputSchema, Map<String, String> params,
         Table table, FileSystem fs,
@@ -440,16 +442,26 @@ class FileOutputCommitterContainer exten
 
         // Sets permissions and group name on partition dirs and files.
 
-        Path partPath = new Path(partLocnRoot);
-        int i = 0;
-        for (FieldSchema partKey : table.getPartitionKeys()) {
-            if (i++ != 0) {
-                applyGroupAndPerms(fs, partPath, perms, grpName, false);
+        Path partPath;
+        if (Boolean.valueOf((String)table.getProperty("EXTERNAL"))
+               && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) {
+            // honor external table that specifies the location
+            partPath = new Path(jobInfo.getLocation());
+        } else {
+            partPath = new Path(partLocnRoot);
+            int i = 0;
+            for (FieldSchema partKey : table.getPartitionKeys()) {
+                if (i++ != 0) {
+                    applyGroupAndPerms(fs, partPath, perms, grpName, false);
+                }
+                partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
             }
-            partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
         }
+
         // Apply the group and permissions to the leaf partition and files.
         applyGroupAndPerms(fs, partPath, perms, grpName, true);
+
+        // Set the location in the StorageDescriptor
         if (dynamicPartitioningUsed) {
             String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs);
             if (harProcessor.isEnabled()) {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java Thu Sep 13 17:09:01 2012
@@ -116,8 +116,12 @@ public class FosterStorageHandler extend
 
             String outputLocation;
 
-            // For non-partitioned tables, we send them to the temp dir
-            if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
+            if (Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
+                   && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) {
+                // honor external table that specifies the location
+                outputLocation = jobInfo.getLocation();
+            } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
+                // For non-partitioned tables, we send them to the temp dir
                 outputLocation = TEMP_DIR_NAME;
             } else {
                 List<String> cols = new ArrayList<String>();