You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2012/01/12 05:11:39 UTC

svn commit: r1230391 [1/2] - in /hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/io/ shims/ shims/src/0.20S/java/org/apache/hadoop/hive/shims/ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/c...

Author: amareshwari
Date: Thu Jan 12 04:11:37 2012
New Revision: 1230391

URL: http://svn.apache.org/viewvc?rev=1230391&view=rev
Log:
HIVE-2629. Make a single Hive binary work with both 0.20.x and 0.23.0. (Thomas Weise via amareshwari)

Added:
    hive/trunk/shims/src/common-secure/
    hive/trunk/shims/src/common-secure/java/
    hive/trunk/shims/src/common-secure/java/org/
    hive/trunk/shims/src/common-secure/java/org/apache/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
Removed:
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
    hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java
    hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java
Modified:
    hive/trunk/build-common.xml
    hive/trunk/build.properties
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java
    hive/trunk/shims/build.xml
    hive/trunk/shims/ivy.xml
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java

Modified: hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build-common.xml?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/build-common.xml (original)
+++ hive/trunk/build-common.xml Thu Jan 12 04:11:37 2012
@@ -115,14 +115,14 @@
       log="${ivyresolvelog}"/>
   </target>
 
-
-  <target name="ivy-retrieve-hadoop-source" depends="ivy-init-settings"
+  <target name="ivy-retrieve-hadoop-source"
     description="Retrieve Ivy-managed Hadoop source artifacts" unless="ivy.skip">
     <echo message="Project: ${ant.project.name}"/>
-    <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings"
+  	<echo message="hadoop.version.ant-internal: ${hadoop.version.ant-internal}"/>
+  	<ivy:settings id="${ant.project.name}-${hadoop.version.ant-internal}.ivy.settings" file="${ivysettings.xml}"/>
+  	<ivy:retrieve settingsRef="${ant.project.name}-${hadoop.version.ant-internal}.ivy.settings"
       pattern="${build.dir.hadoop}/[artifact]-[revision].[ext]"/>
   </target>
-
   
   <available property="hadoopcore.${hadoop.version.ant-internal}.install.done"
     file="${build.dir.hadoop}/hadoop-${hadoop.version.ant-internal}.installed"/>

Modified: hive/trunk/build.properties
URL: http://svn.apache.org/viewvc/hive/trunk/build.properties?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/build.properties (original)
+++ hive/trunk/build.properties Thu Jan 12 04:11:37 2012
@@ -10,9 +10,11 @@ javac.deprecation=off
 javac.args=
 javac.args.warnings=
 
-hadoop.version=0.20.1
-hadoop.security.version=0.20.3-CDH3-SNAPSHOT
-hadoop.security.version.prefix=0.20S
+hadoop-0.20.version=0.20.1
+hadoop-0.20S.version=0.20.3-CDH3-SNAPSHOT
+hadoop-0.23.version=0.23.0
+hadoop.version=${hadoop-0.20.version}
+hadoop.security.version=${hadoop-0.20S.version}
 hadoop.mirror=http://mirror.facebook.net/facebook/hive-deps
 hadoop.mirror2=http://archive.cloudera.com/hive-deps
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java Thu Jan 12 04:11:37 2012
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.compress.*;
  */
 public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream {
 
-  protected SchemaAwareCompressionInputStream(InputStream in) {
+  protected SchemaAwareCompressionInputStream(InputStream in) throws java.io.IOException {
     super(in);
   }
 

Modified: hive/trunk/shims/build.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/build.xml?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/shims/build.xml (original)
+++ hive/trunk/shims/build.xml Thu Jan 12 04:11:37 2012
@@ -51,50 +51,49 @@ to call at top-level: ant deploy-contrib
     <path refid="common-classpath"/>
   </path>
 
-  <target name="build_shims" depends="install-hadoopcore-internal, ivy-retrieve-hadoop-source"
+  <!-- default list of shims to build -->
+  <property name="shims.include" value="0.20,0.20S,0.23"/>
+  <property name="shims.common.sources" value="${basedir}/src/common/java"/>	
+  <property name="shims.common.secure.sources" value="${basedir}/src/common/java;${basedir}/src/common-secure/java"/>
+  <!-- sources and hadoop version for each shim -->
+  <property name="shims.0.20.sources" value="${shims.common.sources};${basedir}/src/0.20/java" />	
+  <property name="shims.0.20.version" value="${hadoop-0.20.version}" />	
+  <property name="shims.0.20S.sources" value="${shims.common.secure.sources};${basedir}/src/0.20S/java" />	
+  <property name="shims.0.20S.version" value="${hadoop-0.20S.version}" />	
+  <property name="shims.0.23.sources" value="${shims.common.secure.sources};${basedir}/src/0.23/java" />	
+  <property name="shims.0.23.version" value="${hadoop-0.23.version}" />	
+	
+  <target name="build_shims" depends="install-hadoopcore-internal"
           description="Build shims against a particular hadoop version">
     <echo message="Project: ${ant.project.name}"/>
-    <getversionpref property="hadoop.version.ant-internal.prefix" input="${hadoop.version.ant-internal}" />
-    <echo message="Compiling shims against hadoop ${hadoop.version.ant-internal} (${hadoop.root})"/>
+    <echo message="Compiling ${sources} against hadoop ${hadoop.version.ant-internal} (${hadoop.root})"/>
     <javac
      encoding="${build.encoding}"
      includes="**/*.java"
-     excludes="**/Proxy*.java"
      destdir="${build.classes}"
      debug="${javac.debug}"
      deprecation="${javac.deprecation}"
+     srcdir="${sources}"
      includeantruntime="false">
       <compilerarg line="${javac.args} ${javac.args.warnings}" />
       <classpath refid="classpath"/>
-      <src path="${basedir}/src/${hadoop.version.ant-internal.prefix}/java" />
-      <src path="${basedir}/src/common/java" />
     </javac>
   </target>
-
+	
   <target name="compile" depends="init,ivy-retrieve">
-    <echo message="Project: ${ant.project.name}"/>
-    <antcall target="build_shims" inheritRefs="false" inheritAll="false">
-      <param name="hadoop.version.ant-internal" value="${hadoop.version}" />
-    </antcall>
-    <antcall target="build_shims" inheritRefs="false" inheritAll="false">
-      <param name="hadoop.version.ant-internal" value="${hadoop.security.version}" />
-      <param name="hadoop.version.ant-internal.prefix" value="${hadoop.security.version.prefix}" />
-    </antcall>
-    <getversionpref property="hadoop.version.ant-internal.prefix" input="${hadoop.version}" />
-    <javac
-     encoding="${build.encoding}"
-     includes="**/Proxy*.java"
-     destdir="${build.classes}"
-     debug="${javac.debug}"
-     deprecation="${javac.deprecation}"
-     includeantruntime="false">
-      <compilerarg line="${javac.args} ${javac.args.warnings}" />
-      <classpath refid="classpath"/>
-      <src path="${basedir}/src/common/java" />
-    </javac>
+  		<echo message="Project: ${ant.project.name}"/>
+		<for param="shimName" list="${shims.include}">
+		  <sequential>
+		    <echo>Building shims @{shimName}</echo>
+		  	<antcall target="build_shims" inheritRefs="false" inheritAll="false">
+		      <param name="hadoop.version.ant-internal" value="${shims.@{shimName}.version}" />
+		      <param name="sources" value="${shims.@{shimName}.sources}" />
+		    </antcall>
+		  </sequential>
+	  	</for>  	
   </target>
   
-  <target name="compile_secure_test" depends="install-hadoopcore-internal, ivy-retrieve-hadoop-source" 
+  <target name="compile_secure_test" depends="install-hadoopcore-internal" 
           description="Test shims against a particular hadoop version">
     <echo message="Project: ${ant.project.name}"/>
     <getversionpref property="hadoop.version.ant-internal.prefix" input="${hadoop.version.ant-internal}" />
@@ -133,9 +132,8 @@ to call at top-level: ant deploy-contrib
   <target name="compile-test" depends="compile">
     <echo message="Project: ${ant.project.name}"/>
     <!-- TODO: move tests to version directory -->
-    <!--antcall target="compile_secure_test" inheritRefs="false" inheritAll="false">
+    <antcall target="compile_secure_test" inheritRefs="false" inheritAll="false">
       <param name="hadoop.version.ant-internal" value="${hadoop.security.version}" />
-      <param name="hadoop.version.ant-internal.prefix" value="${hadoop.security.version.prefix}" />
-    </antcall-->
+    </antcall>
   </target>
 </project>

Modified: hive/trunk/shims/ivy.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/ivy.xml?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/shims/ivy.xml (original)
+++ hive/trunk/shims/ivy.xml Thu Jan 12 04:11:37 2012
@@ -27,10 +27,7 @@
     <include file="${ivy.conf.dir}/common-configurations.xml"/>
   </configurations>
   <dependencies>
-    <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop.version}">
-      <artifact name="hadoop" type="source" ext="tar.gz"/>
-    </dependency> 
-    <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop.security.version}">
+    <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop.version.ant-internal}">
       <artifact name="hadoop" type="source" ext="tar.gz"/>
     </dependency>
     <dependency org="org.apache.zookeeper" name="zookeeper"

Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Jan 12 04:11:37 2012
@@ -17,510 +17,17 @@
  */
 package org.apache.hadoop.hive.shims;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
-import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
-import org.apache.hadoop.mapred.lib.CombineFileSplit;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.tools.HadoopArchives;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ToolRunner;
 
 /**
- * Implemention of shims against Hadoop 0.20.0.
+ * Implemention of shims against Hadoop 0.20 with Security.
  */
-public class Hadoop20SShims implements HadoopShims {
-  public boolean usesJobShell() {
-    return false;
-  }
-
-  public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
-      throws IOException {
-
-    return fs.deleteOnExit(path);
-  }
-
-  public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
-      throws IOException {
-    // gone in 0.18+
-  }
-
-  public boolean isJobPreparing(RunningJob job) throws IOException {
-    return job.getJobState() == JobStatus.PREP;
-  }
-  /**
-   * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
-   */
-  public void setTmpFiles(String prop, String files) {
-    // gone in 20+
-  }
-
-  public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
-      int numDataNodes,
-      boolean format,
-      String[] racks) throws IOException {
-    return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
-  }
-
-  /**
-   * MiniDFSShim.
-   *
-   */
-  public class MiniDFSShim implements HadoopShims.MiniDFSShim {
-    private final MiniDFSCluster cluster;
-
-    public MiniDFSShim(MiniDFSCluster cluster) {
-      this.cluster = cluster;
-    }
-
-    public FileSystem getFileSystem() throws IOException {
-      return cluster.getFileSystem();
-    }
-
-    public void shutdown() {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * We define this function here to make the code compatible between
-   * hadoop 0.17 and hadoop 0.20.
-   *
-   * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
-   * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
-   * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
-   * references that class, which is not available in hadoop 0.17.
-   */
-  public int compareText(Text a, Text b) {
-    return a.compareTo(b);
-  }
-
-  @Override
-  public long getAccessTime(FileStatus file) {
-    return file.getAccessTime();
-  }
-
-  public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
-    return new CombineFileInputFormatShim() {
-      @Override
-      public RecordReader getRecordReader(InputSplit split,
-          JobConf job, Reporter reporter) throws IOException {
-        throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
-      }
-    };
-  }
-
-  public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
-    long shrinkedLength;
-    boolean _isShrinked;
-    public InputSplitShim() {
-      super();
-      _isShrinked = false;
-    }
-
-    public InputSplitShim(CombineFileSplit old) throws IOException {
-      super(old);
-      _isShrinked = false;
-    }
-
-    @Override
-    public void shrinkSplit(long length) {
-      _isShrinked = true;
-      shrinkedLength = length;
-    }
-
-    public boolean isShrinked() {
-      return _isShrinked;
-    }
-
-    public long getShrinkedLength() {
-      return shrinkedLength;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      super.readFields(in);
-      _isShrinked = in.readBoolean();
-      if (_isShrinked) {
-        shrinkedLength = in.readLong();
-      }
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      super.write(out);
-      out.writeBoolean(_isShrinked);
-      if (_isShrinked) {
-        out.writeLong(shrinkedLength);
-      }
-    }
-  }
-
-  /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
-   * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
-   * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
-   */
-  public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
-
-    static final Class[] constructorSignature = new Class[] {
-        InputSplit.class,
-        Configuration.class,
-        Reporter.class,
-        Integer.class
-        };
-
-    protected CombineFileSplit split;
-    protected JobConf jc;
-    protected Reporter reporter;
-    protected Class<RecordReader<K, V>> rrClass;
-    protected Constructor<RecordReader<K, V>> rrConstructor;
-    protected FileSystem fs;
-
-    protected int idx;
-    protected long progress;
-    protected RecordReader<K, V> curReader;
-    protected boolean isShrinked;
-    protected long shrinkedLength;
-
-    public boolean next(K key, V value) throws IOException {
-
-      while ((curReader == null)
-          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
-              value)) {
-        if (!initNextRecordReader(key)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    public K createKey() {
-      K newKey = curReader.createKey();
-      return (K)(new CombineHiveKey(newKey));
-    }
-
-    public V createValue() {
-      return curReader.createValue();
-    }
-
-    /**
-     * Return the amount of data processed.
-     */
-    public long getPos() throws IOException {
-      return progress;
-    }
-
-    public void close() throws IOException {
-      if (curReader != null) {
-        curReader.close();
-        curReader = null;
-      }
-    }
-
-    /**
-     * Return progress based on the amount of data processed so far.
-     */
-    public float getProgress() throws IOException {
-      return Math.min(1.0f, progress / (float) (split.getLength()));
-    }
-
-    /**
-     * A generic RecordReader that can hand out different recordReaders
-     * for each chunk in the CombineFileSplit.
-     */
-    public CombineFileRecordReader(JobConf job, CombineFileSplit split,
-        Reporter reporter,
-        Class<RecordReader<K, V>> rrClass)
-        throws IOException {
-      this.split = split;
-      this.jc = job;
-      this.rrClass = rrClass;
-      this.reporter = reporter;
-      this.idx = 0;
-      this.curReader = null;
-      this.progress = 0;
-
-      isShrinked = false;
-
-      assert (split instanceof Hadoop20Shims.InputSplitShim);
-      if (((InputSplitShim) split).isShrinked()) {
-        isShrinked = true;
-        shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
-      }      
-      
-      try {
-        rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
-        rrConstructor.setAccessible(true);
-      } catch (Exception e) {
-        throw new RuntimeException(rrClass.getName() +
-            " does not have valid constructor", e);
-      }
-      initNextRecordReader(null);
-    }
-    
-    /**
-     * do next and handle exception inside it. 
-     * @param key
-     * @param value
-     * @return
-     * @throws IOException
-     */
-    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
-      try {
-        return curReader.next(key, value);
-      } catch (Exception e) {
-        return HiveIOExceptionHandlerUtil
-            .handleRecordReaderNextException(e, jc);
-      }
-    }
-
-    /**
-     * Get the record reader for the next chunk in this CombineFileSplit.
-     */
-    protected boolean initNextRecordReader(K key) throws IOException {
-
-      if (curReader != null) {
-        curReader.close();
-        curReader = null;
-        if (idx > 0) {
-          progress += split.getLength(idx - 1); // done processing so far
-        }
-      }
-
-      // if all chunks have been processed, nothing more to do.
-      if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
-        return false;
-      }
-
-      // get a record reader for the idx-th chunk
-      try {
-        curReader = rrConstructor.newInstance(new Object[]
-            {split, jc, reporter, Integer.valueOf(idx)});
-
-        // change the key if need be
-        if (key != null) {
-          K newKey = curReader.createKey();
-          ((CombineHiveKey)key).setKey(newKey);
-        }
-
-        // setup some helper config variables.
-        jc.set("map.input.file", split.getPath(idx).toString());
-        jc.setLong("map.input.start", split.getOffset(idx));
-        jc.setLong("map.input.length", split.getLength(idx));
-      } catch (Exception e) {
-        curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(
-            e, jc);
-      }
-      idx++;
-      return true;
-    }
-  }
-
-  public abstract static class CombineFileInputFormatShim<K, V> extends
-      CombineFileInputFormat<K, V>
-      implements HadoopShims.CombineFileInputFormatShim<K, V> {
-
-    public Path[] getInputPathsShim(JobConf conf) {
-      try {
-        return FileInputFormat.getInputPaths(conf);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void createPool(JobConf conf, PathFilter... filters) {
-      super.createPool(conf, filters);
-    }
-
-    @Override
-    public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
-      long minSize = job.getLong("mapred.min.split.size", 0);
-
-      // For backward compatibility, let the above parameter be used
-      if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
-        super.setMinSplitSizeNode(minSize);
-      }
-
-      if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
-        super.setMinSplitSizeRack(minSize);
-      }
-
-      if (job.getLong("mapred.max.split.size", 0) == 0) {
-        super.setMaxSplitSize(minSize);
-      }
-
-      CombineFileSplit[] splits = (CombineFileSplit[]) super.getSplits(job, numSplits);
-
-      InputSplitShim[] isplits = new InputSplitShim[splits.length];
-      for (int pos = 0; pos < splits.length; pos++) {
-        isplits[pos] = new InputSplitShim(splits[pos]);
-      }
-
-      return isplits;
-    }
-
-    public InputSplitShim getInputSplitShim() throws IOException {
-      return new InputSplitShim();
-    }
-
-    public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
-        Reporter reporter,
-        Class<RecordReader<K, V>> rrClass)
-        throws IOException {
-      CombineFileSplit cfSplit = (CombineFileSplit) split;
-      return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
-    }
-
-  }
-
-  public String getInputFormatClassName() {
-    return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
-  }
-
-  String[] ret = new String[2];
-
-  @Override
-  public String[] getTaskJobIDs(TaskCompletionEvent t) {
-    TaskID tid = t.getTaskAttemptId().getTaskID();
-    ret[0] = tid.toString();
-    ret[1] = tid.getJobID().toString();
-    return ret;
-  }
-
-  public void setFloatConf(Configuration conf, String varName, float val) {
-    conf.setFloat(varName, val);
-  }
-
-  @Override
-  public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
-      String archiveName) throws Exception {
-
-    HadoopArchives har = new HadoopArchives(conf);
-    List<String> args = new ArrayList<String>();
-
-    if (conf.get("hive.archive.har.parentdir.settable") == null) {
-      throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
-    }
-    boolean parentSettable =
-      conf.getBoolean("hive.archive.har.parentdir.settable", false);
-
-    if (parentSettable) {
-      args.add("-archiveName");
-      args.add(archiveName);
-      args.add("-p");
-      args.add(sourceDir.toString());
-      args.add(destDir.toString());
-    } else {
-      args.add("-archiveName");
-      args.add(archiveName);
-      args.add(sourceDir.toString());
-      args.add(destDir.toString());
-    }
-
-    return ToolRunner.run(har, args.toArray(new String[0]));
-  }
-
-  public static class NullOutputCommitter extends OutputCommitter {
-    @Override
-    public void setupJob(JobContext jobContext) { }
-    @Override
-    public void cleanupJob(JobContext jobContext) { }
-
-    @Override
-    public void setupTask(TaskAttemptContext taskContext) { }
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-      return false;
-    }
-    @Override
-    public void commitTask(TaskAttemptContext taskContext) { }
-    @Override
-    public void abortTask(TaskAttemptContext taskContext) { }
-  }
-
-  public void setNullOutputFormat(JobConf conf) {
-    conf.setOutputFormat(NullOutputFormat.class);
-    conf.setOutputCommitter(Hadoop20Shims.NullOutputCommitter.class);
-
-    // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
-    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
-
-    // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
-    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-    conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
-  }
-
-  @Override
-  public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
-    return UserGroupInformation.getCurrentUser();
-  }
-  
-  @Override
-  public boolean isSecureShimImpl() {
-    return true;
-  }
-  
-  @Override
-  public String getShortUserName(UserGroupInformation ugi) {
-    return ugi.getShortUserName();
-  }
-
-  @Override
-  public String getTokenStrForm(String tokenSignature) throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
-
-    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
-        tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
-    return token != null ? token.encodeToUrlString() : null;
-  }
-  
-  @Override
-  public void doAs(UserGroupInformation ugi, PrivilegedExceptionAction<Void> pvea) throws IOException, InterruptedException {
-    ugi.doAs(pvea);
-  }
-
-  @Override
-  public UserGroupInformation createRemoteUser(String userName, List<String> groupNames) {
-    return UserGroupInformation.createRemoteUser(userName);
-  }
+public class Hadoop20SShims extends HadoopShimsSecure {
 
   @Override
   public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception {
@@ -535,7 +42,7 @@ public class Hadoop20SShims implements H
       throw new Exception(errorMsg);
     }
   }
-  
+
   @Override
   public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable) {
     return new org.apache.hadoop.mapreduce.TaskAttemptContext(conf, new TaskAttemptID()) {

Modified: hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1230391&r1=1230390&r2=1230391&view=diff
==============================================================================
--- hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Jan 12 04:11:37 2012
@@ -17,505 +17,22 @@
  */
 package org.apache.hadoop.hive.shims;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.security.auth.login.LoginException;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
-import org.apache.hadoop.hive.thrift.DelegationTokenSelector23;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
-import org.apache.hadoop.mapred.lib.CombineFileSplit;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.tools.HadoopArchives;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Implemention of shims against Hadoop 0.23.0.
  */
-public class Hadoop23Shims implements HadoopShims {
-  public boolean usesJobShell() {
-    return false;
-  }
-
-  public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
-      throws IOException {
-
-    return fs.deleteOnExit(path);
-  }
-
-  public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
-      throws IOException {
-    // gone in 0.18+
-  }
-
-  public boolean isJobPreparing(RunningJob job) throws IOException {
-    return job.getJobState() == JobStatus.PREP;
-  }
-  /**
-   * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
-   */
-  public void setTmpFiles(String prop, String files) {
-    // gone in 20+
-  }
-
-  public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
-      int numDataNodes,
-      boolean format,
-      String[] racks) throws IOException {
-    return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
-  }
-
-  /**
-   * MiniDFSShim.
-   *
-   */
-  public class MiniDFSShim implements HadoopShims.MiniDFSShim {
-    private final MiniDFSCluster cluster;
-
-    public MiniDFSShim(MiniDFSCluster cluster) {
-      this.cluster = cluster;
-    }
-
-    public FileSystem getFileSystem() throws IOException {
-      return cluster.getFileSystem();
-    }
-
-    public void shutdown() {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * We define this function here to make the code compatible between
-   * hadoop 0.17 and hadoop 0.20.
-   *
-   * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
-   * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
-   * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
-   * references that class, which is not available in hadoop 0.17.
-   */
-  public int compareText(Text a, Text b) {
-    return a.compareTo(b);
-  }
-
-  @Override
-  public long getAccessTime(FileStatus file) {
-    return file.getAccessTime();
-  }
-
-  public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
-    return new CombineFileInputFormatShim() {
-      @Override
-      public RecordReader getRecordReader(InputSplit split,
-          JobConf job, Reporter reporter) throws IOException {
-        throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
-      }
-    };
-  }
-
-  public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
-    long shrinkedLength;
-    boolean _isShrinked;
-    public InputSplitShim() {
-      super();
-      _isShrinked = false;
-    }
-
-    public InputSplitShim(CombineFileSplit old) throws IOException {
-      super(old);
-      _isShrinked = false;
-    }
-
-    @Override
-    public void shrinkSplit(long length) {
-      _isShrinked = true;
-      shrinkedLength = length;
-    }
-
-    public boolean isShrinked() {
-      return _isShrinked;
-    }
-
-    public long getShrinkedLength() {
-      return shrinkedLength;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      super.readFields(in);
-      _isShrinked = in.readBoolean();
-      if (_isShrinked) {
-        shrinkedLength = in.readLong();
-      }
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      super.write(out);
-      out.writeBoolean(_isShrinked);
-      if (_isShrinked) {
-        out.writeLong(shrinkedLength);
-      }
-    }
-  }
-
-  /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
-   * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
-   * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
-   */
-  public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
-
-    static final Class[] constructorSignature = new Class[] {
-        InputSplit.class,
-        Configuration.class,
-        Reporter.class,
-        Integer.class
-        };
-
-    protected CombineFileSplit split;
-    protected JobConf jc;
-    protected Reporter reporter;
-    protected Class<RecordReader<K, V>> rrClass;
-    protected Constructor<RecordReader<K, V>> rrConstructor;
-    protected FileSystem fs;
-
-    protected int idx;
-    protected long progress;
-    protected RecordReader<K, V> curReader;
-    protected boolean isShrinked;
-    protected long shrinkedLength;
-    
-    public boolean next(K key, V value) throws IOException {
-
-      while ((curReader == null)
-          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
-              value)) {
-        if (!initNextRecordReader(key)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    public K createKey() {
-      K newKey = curReader.createKey();
-      return (K)(new CombineHiveKey(newKey));
-    }
-
-    public V createValue() {
-      return curReader.createValue();
-    }
-
-    /**
-     * Return the amount of data processed.
-     */
-    public long getPos() throws IOException {
-      return progress;
-    }
-
-    public void close() throws IOException {
-      if (curReader != null) {
-        curReader.close();
-        curReader = null;
-      }
-    }
-
-    /**
-     * Return progress based on the amount of data processed so far.
-     */
-    public float getProgress() throws IOException {
-      return Math.min(1.0f, progress / (float) (split.getLength()));
-    }
-
-    /**
-     * A generic RecordReader that can hand out different recordReaders
-     * for each chunk in the CombineFileSplit.
-     */
-    public CombineFileRecordReader(JobConf job, CombineFileSplit split,
-        Reporter reporter,
-        Class<RecordReader<K, V>> rrClass)
-        throws IOException {
-      this.split = split;
-      this.jc = job;
-      this.rrClass = rrClass;
-      this.reporter = reporter;
-      this.idx = 0;
-      this.curReader = null;
-      this.progress = 0;
-
-      isShrinked = false;
-
-      assert (split instanceof InputSplitShim);
-      if (((InputSplitShim) split).isShrinked()) {
-        isShrinked = true;
-        shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
-      }
-
-      try {
-        rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
-        rrConstructor.setAccessible(true);
-      } catch (Exception e) {
-        throw new RuntimeException(rrClass.getName() +
-            " does not have valid constructor", e);
-      }
-      initNextRecordReader(null);
-    }
-    
-    /**
-     * do next and handle exception inside it. 
-     * @param key
-     * @param value
-     * @return
-     * @throws IOException
-     */
-    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
-      try {
-        return curReader.next(key, value);
-      } catch (Exception e) {
-        return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc);
-      }
-    }
-
-    /**
-     * Get the record reader for the next chunk in this CombineFileSplit.
-     */
-    protected boolean initNextRecordReader(K key) throws IOException {
-
-      if (curReader != null) {
-        curReader.close();
-        curReader = null;
-        if (idx > 0) {
-          progress += split.getLength(idx - 1); // done processing so far
-        }
-      }
-
-      // if all chunks have been processed or reached the length, nothing more to do.
-      if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
-        return false;
-      }
-
-      // get a record reader for the idx-th chunk
-      try {
-        curReader = rrConstructor.newInstance(new Object[]
-            {split, jc, reporter, Integer.valueOf(idx)});
-
-        // change the key if need be
-        if (key != null) {
-          K newKey = curReader.createKey();
-          ((CombineHiveKey)key).setKey(newKey);
-        }
-
-        // setup some helper config variables.
-        jc.set("map.input.file", split.getPath(idx).toString());
-        jc.setLong("map.input.start", split.getOffset(idx));
-        jc.setLong("map.input.length", split.getLength(idx));
-      } catch (Exception e) {
-        curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc);
-      }
-      idx++;
-      return true;
-    }
-  }
-
-  public abstract static class CombineFileInputFormatShim<K, V> extends
-      CombineFileInputFormat<K, V>
-      implements HadoopShims.CombineFileInputFormatShim<K, V> {
-
-    public Path[] getInputPathsShim(JobConf conf) {
-      try {
-        return FileInputFormat.getInputPaths(conf);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void createPool(JobConf conf, PathFilter... filters) {
-      super.createPool(conf, filters);
-    }
-
-    @Override
-    public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
-      long minSize = job.getLong("mapred.min.split.size", 0);
-
-      // For backward compatibility, let the above parameter be used
-      if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
-        super.setMinSplitSizeNode(minSize);
-      }
-
-      if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
-        super.setMinSplitSizeRack(minSize);
-      }
-
-      if (job.getLong("mapred.max.split.size", 0) == 0) {
-        super.setMaxSplitSize(minSize);
-      }
-
-      InputSplit[] splits = super.getSplits(job, numSplits);
-
-      InputSplitShim[] isplits = new InputSplitShim[splits.length];
-      for (int pos = 0; pos < splits.length; pos++) {
-        isplits[pos] = new InputSplitShim((CombineFileSplit) splits[pos]);
-      }
-
-      return isplits;
-    }
-
-    public InputSplitShim getInputSplitShim() throws IOException {
-      return new InputSplitShim();
-    }
-
-    public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
-        Reporter reporter,
-        Class<RecordReader<K, V>> rrClass)
-        throws IOException {
-      CombineFileSplit cfSplit = (CombineFileSplit) split;
-      return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
-    }
-
-  }
-
-  public String getInputFormatClassName() {
-    return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
-  }
-
-  String[] ret = new String[2];
-
-  @Override
-  public String[] getTaskJobIDs(TaskCompletionEvent t) {
-    TaskID tid = t.getTaskAttemptId().getTaskID();
-    ret[0] = tid.toString();
-    ret[1] = tid.getJobID().toString();
-    return ret;
-  }
-
-  public void setFloatConf(Configuration conf, String varName, float val) {
-    conf.setFloat(varName, val);
-  }
+public class Hadoop23Shims extends HadoopShimsSecure {
 
   @Override
-  public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
-      String archiveName) throws Exception {
-
-    HadoopArchives har = new HadoopArchives(conf);
-    List<String> args = new ArrayList<String>();
-
-    if (conf.get("hive.archive.har.parentdir.settable") == null) {
-      throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
-    }
-    boolean parentSettable =
-      conf.getBoolean("hive.archive.har.parentdir.settable", false);
-
-    if (parentSettable) {
-      args.add("-archiveName");
-      args.add(archiveName);
-      args.add("-p");
-      args.add(sourceDir.toString());
-      args.add(destDir.toString());
-    } else {
-      args.add("-archiveName");
-      args.add(archiveName);
-      args.add(sourceDir.toString());
-      args.add(destDir.toString());
-    }
-
-    return ToolRunner.run(har, args.toArray(new String[0]));
-  }
-
-  public static class NullOutputCommitter extends OutputCommitter {
-    @Override
-    public void setupJob(JobContext jobContext) { }
-    @Override
-    public void cleanupJob(JobContext jobContext) { }
-
-    @Override
-    public void setupTask(TaskAttemptContext taskContext) { }
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-      return false;
-    }
-    @Override
-    public void commitTask(TaskAttemptContext taskContext) { }
-    @Override
-    public void abortTask(TaskAttemptContext taskContext) { }
-  }
-
-  public void setNullOutputFormat(JobConf conf) {
-    conf.setOutputFormat(NullOutputFormat.class);
-    conf.setOutputCommitter(Hadoop23Shims.NullOutputCommitter.class);
-
-    // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
-    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
-
-    // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
-    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-    conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
-  }
-
-  @Override
-  public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
-    return UserGroupInformation.getCurrentUser();
-  }
-
-  @Override
-  public boolean isSecureShimImpl() {
-    return true;
-  }
-
-  @Override
-  public String getShortUserName(UserGroupInformation ugi) {
-    return ugi.getShortUserName();
-  }
-
-  @Override
-  public String getTokenStrForm(String tokenSignature) throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector23();
-
-    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
-        tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
-    return token != null ? token.encodeToUrlString() : null;
-  }
-  
-  @Override
   public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception {
     JobTrackerState state;
     switch (clusterStatus.getJobTrackerStatus()) {
@@ -528,7 +45,7 @@ public class Hadoop23Shims implements Ha
       throw new Exception(errorMsg);
     }
   }
-  
+
   @Override
   public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable) {
     return new TaskAttemptContextImpl(conf, new TaskAttemptID()) {

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1230391&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Thu Jan 12 04:11:37 2012
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.shims;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.tools.HadoopArchives;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Base implemention for shims against secure Hadoop 0.20.3/0.23.
+ */
+public abstract class HadoopShimsSecure implements HadoopShims {
+  public boolean usesJobShell() {
+    return false;
+  }
+
+  public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
+      throws IOException {
+
+    return fs.deleteOnExit(path);
+  }
+
+  public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
+      throws IOException {
+    // gone in 0.18+
+  }
+
+  public boolean isJobPreparing(RunningJob job) throws IOException {
+    return job.getJobState() == JobStatus.PREP;
+  }
+  /**
+   * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
+   */
+  public void setTmpFiles(String prop, String files) {
+    // gone in 20+
+  }
+
+  public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
+      int numDataNodes,
+      boolean format,
+      String[] racks) throws IOException {
+    return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
+  }
+
+  /**
+   * MiniDFSShim.
+   *
+   */
+  public class MiniDFSShim implements HadoopShims.MiniDFSShim {
+    private final MiniDFSCluster cluster;
+
+    public MiniDFSShim(MiniDFSCluster cluster) {
+      this.cluster = cluster;
+    }
+
+    public FileSystem getFileSystem() throws IOException {
+      return cluster.getFileSystem();
+    }
+
+    public void shutdown() {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * We define this function here to make the code compatible between
+   * hadoop 0.17 and hadoop 0.20.
+   *
+   * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
+   * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
+   * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
+   * references that class, which is not available in hadoop 0.17.
+   */
+  public int compareText(Text a, Text b) {
+    return a.compareTo(b);
+  }
+
+  @Override
+  public long getAccessTime(FileStatus file) {
+    return file.getAccessTime();
+  }
+
+  public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
+    return new CombineFileInputFormatShim() {
+      @Override
+      public RecordReader getRecordReader(InputSplit split,
+          JobConf job, Reporter reporter) throws IOException {
+        throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
+      }
+    };
+  }
+
+  public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
+    long shrinkedLength;
+    boolean _isShrinked;
+    public InputSplitShim() {
+      super();
+      _isShrinked = false;
+    }
+
+    public InputSplitShim(CombineFileSplit old) throws IOException {
+      super(old);
+      _isShrinked = false;
+    }
+
+    @Override
+    public void shrinkSplit(long length) {
+      _isShrinked = true;
+      shrinkedLength = length;
+    }
+
+    public boolean isShrinked() {
+      return _isShrinked;
+    }
+
+    public long getShrinkedLength() {
+      return shrinkedLength;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      _isShrinked = in.readBoolean();
+      if (_isShrinked) {
+        shrinkedLength = in.readLong();
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      out.writeBoolean(_isShrinked);
+      if (_isShrinked) {
+        out.writeLong(shrinkedLength);
+      }
+    }
+  }
+
+  /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
+   * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
+   * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
+   */
+  public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
+
+    static final Class[] constructorSignature = new Class[] {
+        InputSplit.class,
+        Configuration.class,
+        Reporter.class,
+        Integer.class
+        };
+
+    protected CombineFileSplit split;
+    protected JobConf jc;
+    protected Reporter reporter;
+    protected Class<RecordReader<K, V>> rrClass;
+    protected Constructor<RecordReader<K, V>> rrConstructor;
+    protected FileSystem fs;
+
+    protected int idx;
+    protected long progress;
+    protected RecordReader<K, V> curReader;
+    protected boolean isShrinked;
+    protected long shrinkedLength;
+
+    public boolean next(K key, V value) throws IOException {
+
+      while ((curReader == null)
+          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
+              value)) {
+        if (!initNextRecordReader(key)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public K createKey() {
+      K newKey = curReader.createKey();
+      return (K)(new CombineHiveKey(newKey));
+    }
+
+    public V createValue() {
+      return curReader.createValue();
+    }
+
+    /**
+     * Return the amount of data processed.
+     */
+    public long getPos() throws IOException {
+      return progress;
+    }
+
+    public void close() throws IOException {
+      if (curReader != null) {
+        curReader.close();
+        curReader = null;
+      }
+    }
+
+    /**
+     * Return progress based on the amount of data processed so far.
+     */
+    public float getProgress() throws IOException {
+      return Math.min(1.0f, progress / (float) (split.getLength()));
+    }
+
+    /**
+     * A generic RecordReader that can hand out different recordReaders
+     * for each chunk in the CombineFileSplit.
+     */
+    public CombineFileRecordReader(JobConf job, CombineFileSplit split,
+        Reporter reporter,
+        Class<RecordReader<K, V>> rrClass)
+        throws IOException {
+      this.split = split;
+      this.jc = job;
+      this.rrClass = rrClass;
+      this.reporter = reporter;
+      this.idx = 0;
+      this.curReader = null;
+      this.progress = 0;
+
+      isShrinked = false;
+
+      assert (split instanceof InputSplitShim);
+      if (((InputSplitShim) split).isShrinked()) {
+        isShrinked = true;
+        shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
+      }
+
+      try {
+        rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
+        rrConstructor.setAccessible(true);
+      } catch (Exception e) {
+        throw new RuntimeException(rrClass.getName() +
+            " does not have valid constructor", e);
+      }
+      initNextRecordReader(null);
+    }
+
+    /**
+     * do next and handle exception inside it.
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+      try {
+        return curReader.next(key, value);
+      } catch (Exception e) {
+        return HiveIOExceptionHandlerUtil
+            .handleRecordReaderNextException(e, jc);
+      }
+    }
+
+    /**
+     * Get the record reader for the next chunk in this CombineFileSplit.
+     */
+    protected boolean initNextRecordReader(K key) throws IOException {
+
+      if (curReader != null) {
+        curReader.close();
+        curReader = null;
+        if (idx > 0) {
+          progress += split.getLength(idx - 1); // done processing so far
+        }
+      }
+
+      // if all chunks have been processed, nothing more to do.
+      if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
+        return false;
+      }
+
+      // get a record reader for the idx-th chunk
+      try {
+        curReader = rrConstructor.newInstance(new Object[]
+            {split, jc, reporter, Integer.valueOf(idx)});
+
+        // change the key if need be
+        if (key != null) {
+          K newKey = curReader.createKey();
+          ((CombineHiveKey)key).setKey(newKey);
+        }
+
+        // setup some helper config variables.
+        jc.set("map.input.file", split.getPath(idx).toString());
+        jc.setLong("map.input.start", split.getOffset(idx));
+        jc.setLong("map.input.length", split.getLength(idx));
+      } catch (Exception e) {
+        curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(
+            e, jc);
+      }
+      idx++;
+      return true;
+    }
+  }
+
+  public abstract static class CombineFileInputFormatShim<K, V> extends
+      CombineFileInputFormat<K, V>
+      implements HadoopShims.CombineFileInputFormatShim<K, V> {
+
+    public Path[] getInputPathsShim(JobConf conf) {
+      try {
+        return FileInputFormat.getInputPaths(conf);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void createPool(JobConf conf, PathFilter... filters) {
+      super.createPool(conf, filters);
+    }
+
+    @Override
+    public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
+      long minSize = job.getLong("mapred.min.split.size", 0);
+
+      // For backward compatibility, let the above parameter be used
+      if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
+        super.setMinSplitSizeNode(minSize);
+      }
+
+      if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
+        super.setMinSplitSizeRack(minSize);
+      }
+
+      if (job.getLong("mapred.max.split.size", 0) == 0) {
+        super.setMaxSplitSize(minSize);
+      }
+
+      InputSplit[] splits = (InputSplit[]) super.getSplits(job, numSplits);
+
+      InputSplitShim[] isplits = new InputSplitShim[splits.length];
+      for (int pos = 0; pos < splits.length; pos++) {
+        isplits[pos] = new InputSplitShim((CombineFileSplit)splits[pos]);
+      }
+
+      return isplits;
+    }
+
+    public InputSplitShim getInputSplitShim() throws IOException {
+      return new InputSplitShim();
+    }
+
+    public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
+        Reporter reporter,
+        Class<RecordReader<K, V>> rrClass)
+        throws IOException {
+      CombineFileSplit cfSplit = (CombineFileSplit) split;
+      return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
+    }
+
+  }
+
+  public String getInputFormatClassName() {
+    return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
+  }
+
+  String[] ret = new String[2];
+
+  @Override
+  public String[] getTaskJobIDs(TaskCompletionEvent t) {
+    TaskID tid = t.getTaskAttemptId().getTaskID();
+    ret[0] = tid.toString();
+    ret[1] = tid.getJobID().toString();
+    return ret;
+  }
+
+  public void setFloatConf(Configuration conf, String varName, float val) {
+    conf.setFloat(varName, val);
+  }
+
+  @Override
+  public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
+      String archiveName) throws Exception {
+
+    HadoopArchives har = new HadoopArchives(conf);
+    List<String> args = new ArrayList<String>();
+
+    if (conf.get("hive.archive.har.parentdir.settable") == null) {
+      throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
+    }
+    boolean parentSettable =
+      conf.getBoolean("hive.archive.har.parentdir.settable", false);
+
+    if (parentSettable) {
+      args.add("-archiveName");
+      args.add(archiveName);
+      args.add("-p");
+      args.add(sourceDir.toString());
+      args.add(destDir.toString());
+    } else {
+      args.add("-archiveName");
+      args.add(archiveName);
+      args.add(sourceDir.toString());
+      args.add(destDir.toString());
+    }
+
+    return ToolRunner.run(har, args.toArray(new String[0]));
+  }
+
+  public static class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void setupJob(JobContext jobContext) { }
+    @Override
+    public void cleanupJob(JobContext jobContext) { }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) { }
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return false;
+    }
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) { }
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) { }
+  }
+
+  public void setNullOutputFormat(JobConf conf) {
+    conf.setOutputFormat(NullOutputFormat.class);
+    conf.setOutputCommitter(NullOutputCommitter.class);
+
+    // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
+    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+
+    // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
+    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
+    conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
+  }
+
+  @Override
+  public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
+    return UserGroupInformation.getCurrentUser();
+  }
+
+  @Override
+  public boolean isSecureShimImpl() {
+    return true;
+  }
+
+  @Override
+  public String getShortUserName(UserGroupInformation ugi) {
+    return ugi.getShortUserName();
+  }
+
+  @Override
+  public String getTokenStrForm(String tokenSignature) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+
+    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+        tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
+    return token != null ? token.encodeToUrlString() : null;
+  }
+
+  @Override
+  public void doAs(UserGroupInformation ugi, PrivilegedExceptionAction<Void> pvea) throws IOException, InterruptedException {
+    ugi.doAs(pvea);
+  }
+
+  @Override
+  public UserGroupInformation createRemoteUser(String userName, List<String> groupNames) {
+    return UserGroupInformation.createRemoteUser(userName);
+  }
+
+  @Override
+  abstract public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception;
+
+  @Override
+  abstract public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable);
+
+  @Override
+  abstract public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job);
+}

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java?rev=1230391&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java Thu Jan 12 04:11:37 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/**
+ * A delegation token identifier that is specific to Hive.
+ */
+public class DelegationTokenIdentifier
+    extends AbstractDelegationTokenIdentifier {
+  public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
+
+  /**
+   * Create an empty delegation token identifier for reading into.
+   */
+  public DelegationTokenIdentifier() {
+  }
+
+  /**
+   * Create a new delegation token identifier
+   * @param owner the effective username of the token owner
+   * @param renewer the username of the renewer
+   * @param realUser the real username of the token owner
+   */
+  public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+    super(owner, renewer, realUser);
+  }
+
+  @Override
+  public Text getKind() {
+    return HIVE_DELEGATION_KIND;
+  }
+
+}

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java?rev=1230391&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java Thu Jan 12 04:11:37 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+  /**
+   * Create a secret manager
+   * @param delegationKeyUpdateInterval the number of seconds for rolling new
+   *        secret keys.
+   * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+   *        tokens
+   * @param delegationTokenRenewInterval how often the tokens must be renewed
+   * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+   *        for expired tokens
+   */
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+                                      long delegationTokenMaxLifetime,
+                                      long delegationTokenRenewInterval,
+                                      long delegationTokenRemoverScanInterval) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+
+  public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    cancelToken(t, user);
+  }
+
+  public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    return renewToken(t, user);
+  }
+
+  public synchronized String getDelegationToken(String renewer) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Text owner = new Text(ugi.getUserName());
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    DelegationTokenIdentifier ident =
+      new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+    Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
+        ident, this);
+    return t.encodeToUrlString();
+  }
+}
+

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java?rev=1230391&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java Thu Jan 12 04:11:37 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+  public DelegationTokenSelector() {
+    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+  }
+}