You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by rv...@apache.org on 2011/11/23 22:26:19 UTC
svn commit: r1205607 [2/3] - in /incubator/bigtop/branches/hadoop-0.23: ./
bigtop-packages/src/common/hive/ bigtop-packages/src/common/sqoop/
bigtop-packages/src/rpm/hive/SPECS/ bigtop-packages/src/rpm/mahout/SPECS/
bigtop-packages/src/rpm/sqoop/SPECS/
Modified: incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch?rev=1205607&r1=1205606&r2=1205607&view=diff
==============================================================================
--- incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch (original)
+++ incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch Wed Nov 23 21:26:18 2011
@@ -1,6 +1,350 @@
+Index: shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+===================================================================
+--- shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java (revision 1203794)
++++ shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java (working copy)
+@@ -1,61 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-
+-package org.apache.hadoop.fs;
+-
+-import java.io.*;
+-import java.net.URI;
+-import java.net.URISyntaxException;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.permission.FsPermission;
+-import org.apache.hadoop.util.Progressable;
+-
+-/****************************************************************
+- * A Proxy for LocalFileSystem
+- *
+- * Serves uri's corresponding to 'pfile:///' namespace with using
+- * a LocalFileSystem
+- *****************************************************************/
+-
+-public class ProxyLocalFileSystem extends FilterFileSystem {
+-
+- protected LocalFileSystem localFs;
+-
+- public ProxyLocalFileSystem() {
+- localFs = new LocalFileSystem();
+- }
+-
+- public ProxyLocalFileSystem(FileSystem fs) {
+- throw new RuntimeException ("Unsupported Constructor");
+- }
+-
+- @Override
+- public void initialize(URI name, Configuration conf) throws IOException {
+- // create a proxy for the local filesystem
+- // the scheme/authority serving as the proxy is derived
+- // from the supplied URI
+-
+- String scheme = name.getScheme();
+- String authority = name.getAuthority() != null ? name.getAuthority() : "";
+- String proxyUriString = name + "://" + authority + "/";
+- fs = new ProxyFileSystem(localFs, URI.create(proxyUriString));
+-
+- fs.initialize(name, conf);
+- }
+-}
+Index: shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java
+===================================================================
+--- shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java (revision 1203794)
++++ shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java (working copy)
+@@ -1,273 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-
+-package org.apache.hadoop.fs;
+-
+-import java.io.IOException;
+-import java.net.URI;
+-import java.net.URISyntaxException;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.permission.FsPermission;
+-import org.apache.hadoop.util.Progressable;
+-
+-/****************************************************************
+- * A FileSystem that can serve a given scheme/authority using some
+- * other file system. In that sense, it serves as a proxy for the
+- * real/underlying file system
+- *****************************************************************/
+-
+-public class ProxyFileSystem extends FilterFileSystem {
+-
+- protected String myScheme;
+- protected String myAuthority;
+- protected URI myUri;
+-
+- protected String realScheme;
+- protected String realAuthority;
+- protected URI realUri;
+-
+-
+-
+- private Path swizzleParamPath(Path p) {
+- return new Path (realScheme, realAuthority, p.toUri().getPath());
+- }
+-
+- private Path swizzleReturnPath(Path p) {
+- return new Path (myScheme, myAuthority, p.toUri().getPath());
+- }
+-
+- private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) {
+- FileStatus ret =
+- new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(),
+- orig.getBlockSize(), orig.getModificationTime(),
+- orig.getAccessTime(), orig.getPermission(),
+- orig.getOwner(), orig.getGroup(),
+- isParam ? swizzleParamPath(orig.getPath()) :
+- swizzleReturnPath(orig.getPath()));
+- return ret;
+- }
+-
+- public ProxyFileSystem() {
+- throw new RuntimeException ("Unsupported constructor");
+- }
+-
+- public ProxyFileSystem(FileSystem fs) {
+- throw new RuntimeException ("Unsupported constructor");
+- }
+-
+- /**
+- * Create a proxy file system for fs.
+- *
+- * @param fs FileSystem to create proxy for
+- * @param myUri URI to use as proxy. Only the scheme and authority from
+- * this are used right now
+- */
+- public ProxyFileSystem(FileSystem fs, URI myUri) {
+- super(fs);
+-
+- URI realUri = fs.getUri();
+- this.realScheme = realUri.getScheme();
+- this.realAuthority=realUri.getAuthority();
+- this.realUri = realUri;
+-
+- this.myScheme = myUri.getScheme();
+- this.myAuthority=myUri.getAuthority();
+- this.myUri = myUri;
+- }
+-
+- @Override
+- public void initialize(URI name, Configuration conf) throws IOException {
+- try {
+- URI realUri = new URI (realScheme, realAuthority,
+- name.getPath(), name.getQuery(), name.getFragment());
+- super.initialize(realUri, conf);
+- } catch (URISyntaxException e) {
+- throw new RuntimeException(e);
+- }
+- }
+-
+- @Override
+- public URI getUri() {
+- return myUri;
+- }
+-
+- @Override
+- public String getName() {
+- return getUri().toString();
+- }
+-
+- @Override
+- public Path makeQualified(Path path) {
+- return swizzleReturnPath(super.makeQualified(swizzleParamPath(path)));
+- }
+-
+-
+- @Override
+- protected void checkPath(Path path) {
+- super.checkPath(swizzleParamPath(path));
+- }
+-
+- @Override
+- public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+- long len) throws IOException {
+- return super.getFileBlockLocations(swizzleFileStatus(file, true),
+- start, len);
+- }
+-
+- @Override
+- public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+- return super.open(swizzleParamPath(f), bufferSize);
+- }
+-
+- @Override
+- public FSDataOutputStream append(Path f, int bufferSize,
+- Progressable progress) throws IOException {
+- return super.append(swizzleParamPath(f), bufferSize, progress);
+- }
+-
+- @Override
+- public FSDataOutputStream create(Path f, FsPermission permission,
+- boolean overwrite, int bufferSize, short replication, long blockSize,
+- Progressable progress) throws IOException {
+- return super.create(swizzleParamPath(f), permission,
+- overwrite, bufferSize, replication, blockSize, progress);
+- }
+-
+- @Override
+- public boolean setReplication(Path src, short replication) throws IOException {
+- return super.setReplication(swizzleParamPath(src), replication);
+- }
+-
+- @Override
+- public boolean rename(Path src, Path dst) throws IOException {
+- return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
+- }
+-
+- @Override
+- public boolean delete(Path f, boolean recursive) throws IOException {
+- return super.delete(swizzleParamPath(f), recursive);
+- }
+-
+- @Override
+- public boolean deleteOnExit(Path f) throws IOException {
+- return super.deleteOnExit(swizzleParamPath(f));
+- }
+-
+- @Override
+- public FileStatus[] listStatus(Path f) throws IOException {
+- FileStatus[] orig = super.listStatus(swizzleParamPath(f));
+- FileStatus[] ret = new FileStatus [orig.length];
+- for (int i=0; i<orig.length; i++) {
+- ret[i] = swizzleFileStatus(orig[i], false);
+- }
+- return ret;
+- }
+-
+- @Override
+- public Path getHomeDirectory() {
+- return swizzleReturnPath(super.getHomeDirectory());
+- }
+-
+- @Override
+- public void setWorkingDirectory(Path newDir) {
+- super.setWorkingDirectory(swizzleParamPath(newDir));
+- }
+-
+- @Override
+- public Path getWorkingDirectory() {
+- return swizzleReturnPath(super.getWorkingDirectory());
+- }
+-
+- @Override
+- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+- return super.mkdirs(swizzleParamPath(f), permission);
+- }
+-
+- @Override
+- public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+- throws IOException {
+- super.copyFromLocalFile(delSrc, swizzleParamPath(src), swizzleParamPath(dst));
+- }
+-
+- @Override
+- public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+- Path[] srcs, Path dst)
+- throws IOException {
+- super.copyFromLocalFile(delSrc, overwrite, srcs, swizzleParamPath(dst));
+- }
+-
+- @Override
+- public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+- Path src, Path dst)
+- throws IOException {
+- super.copyFromLocalFile(delSrc, overwrite, src, swizzleParamPath(dst));
+- }
+-
+- @Override
+- public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+- throws IOException {
+- super.copyToLocalFile(delSrc, swizzleParamPath(src), dst);
+- }
+-
+- @Override
+- public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+- throws IOException {
+- return super.startLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
+- }
+-
+- @Override
+- public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+- throws IOException {
+- super.completeLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
+- }
+-
+- @Override
+- public ContentSummary getContentSummary(Path f) throws IOException {
+- return super.getContentSummary(swizzleParamPath(f));
+- }
+-
+- @Override
+- public FileStatus getFileStatus(Path f) throws IOException {
+- return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false);
+- }
+-
+- @Override
+- public FileChecksum getFileChecksum(Path f) throws IOException {
+- return super.getFileChecksum(swizzleParamPath(f));
+- }
+-
+- @Override
+- public void setOwner(Path p, String username, String groupname
+- ) throws IOException {
+- super.setOwner(swizzleParamPath(p), username, groupname);
+- }
+-
+- @Override
+- public void setTimes(Path p, long mtime, long atime
+- ) throws IOException {
+- super.setTimes(swizzleParamPath(p), mtime, atime);
+- }
+-
+- @Override
+- public void setPermission(Path p, FsPermission permission
+- ) throws IOException {
+- super.setPermission(swizzleParamPath(p), permission);
+- }
+-}
+-
Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
===================================================================
---- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 1196775)
+--- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 1203794)
+++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy)
@@ -35,6 +35,7 @@
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
@@ -80,7 +424,7 @@ Index: shims/src/0.20/java/org/apache/ha
}
Index: shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
===================================================================
---- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (revision 1196775)
+--- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (revision 1203794)
+++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (working copy)
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
@@ -139,11 +483,11 @@ Index: shims/src/0.20S/java/org/apache/h
+ return new org.apache.hadoop.mapreduce.JobContext(job.getConfiguration(), job.getJobID());
+ }
}
-Index: shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java
===================================================================
---- shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java (revision 0)
-@@ -0,0 +1,61 @@
+--- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java (revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java (revision 0)
+@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
@@ -162,60 +506,95 @@ Index: shims/src/0.23/java/org/apache/ha
+ * limitations under the License.
+ */
+
-+package org.apache.hadoop.fs;
++package org.apache.hadoop.hive.thrift;
+
-+import java.io.*;
-+import java.net.URI;
-+import java.net.URISyntaxException;
++import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.permission.FsPermission;
-+import org.apache.hadoop.util.Progressable;
++/**
++ * A delegation token that is specialized for Hive
++ */
+
-+/****************************************************************
-+ * A Proxy for LocalFileSystem
++public class DelegationTokenSelector23
++ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier23>{
++
++ public DelegationTokenSelector23() {
++ super(DelegationTokenIdentifier23.HIVE_DELEGATION_KIND);
++ }
++}
+
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java
+___________________________________________________________________
+Added: svn:eol-style
+ + native
+
+Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java
+===================================================================
+--- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java (revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java (revision 0)
+@@ -0,0 +1,52 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
+ *
-+ * Serves uri's corresponding to 'pfile:///' namespace with using
-+ * a LocalFileSystem
-+ *****************************************************************/
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+
-+public class ProxyLocalFileSystem extends FilterFileSystem {
++package org.apache.hadoop.hive.thrift;
+
-+ protected LocalFileSystem localFs;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
-+ public ProxyLocalFileSystem() {
-+ localFs = new LocalFileSystem();
++/**
++ * A delegation token identifier that is specific to Hive.
++ */
++public class DelegationTokenIdentifier23
++ extends AbstractDelegationTokenIdentifier {
++ public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
++
++ /**
++ * Create an empty delegation token identifier for reading into.
++ */
++ public DelegationTokenIdentifier23() {
+ }
+
-+ public ProxyLocalFileSystem(FileSystem fs) {
-+ throw new RuntimeException ("Unsupported Constructor");
++ /**
++ * Create a new delegation token identifier
++ * @param owner the effective username of the token owner
++ * @param renewer the username of the renewer
++ * @param realUser the real username of the token owner
++ */
++ public DelegationTokenIdentifier23(Text owner, Text renewer, Text realUser) {
++ super(owner, renewer, realUser);
+ }
+
+ @Override
-+ public void initialize(URI name, Configuration conf) throws IOException {
-+ // create a proxy for the local filesystem
-+ // the scheme/authority serving as the proxy is derived
-+ // from the supplied URI
-+
-+ String scheme = name.getScheme();
-+ String authority = name.getAuthority() != null ? name.getAuthority() : "";
-+ String proxyUriString = name + "://" + authority + "/";
-+ fs = new ProxyFileSystem(localFs, URI.create(proxyUriString));
-+
-+ fs.initialize(name, conf);
++ public Text getKind() {
++ return HIVE_DELEGATION_KIND;
+ }
++
+}
-Property changes on: shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java
___________________________________________________________________
Added: svn:eol-style
+ native
-Index: shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java
+Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
===================================================================
---- shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java (revision 0)
-@@ -0,0 +1,273 @@
+--- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (revision 0)
+@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
@@ -233,434 +612,546 @@ Index: shims/src/0.23/java/org/apache/ha
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
++package org.apache.hadoop.hive.shims;
+
-+package org.apache.hadoop.fs;
-+
++import java.io.DataInput;
++import java.io.DataOutput;
+import java.io.IOException;
-+import java.net.URI;
-+import java.net.URISyntaxException;
++import java.lang.reflect.Constructor;
++import java.util.ArrayList;
++import java.util.List;
++
++import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.PathFilter;
++import org.apache.hadoop.hdfs.MiniDFSCluster;
++import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
++import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
++import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
++import org.apache.hadoop.hive.thrift.DelegationTokenSelector23;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.mapred.ClusterStatus;
++import org.apache.hadoop.mapred.FileInputFormat;
++import org.apache.hadoop.mapred.InputFormat;
++import org.apache.hadoop.mapred.InputSplit;
++import org.apache.hadoop.mapred.JobConf;
++import org.apache.hadoop.mapred.JobContext;
++import org.apache.hadoop.mapred.JobStatus;
++import org.apache.hadoop.mapred.OutputCommitter;
++import org.apache.hadoop.mapred.RecordReader;
++import org.apache.hadoop.mapred.Reporter;
++import org.apache.hadoop.mapred.RunningJob;
++import org.apache.hadoop.mapred.TaskAttemptContext;
++import org.apache.hadoop.mapred.TaskCompletionEvent;
++import org.apache.hadoop.mapred.TaskID;
++import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
++import org.apache.hadoop.mapred.lib.CombineFileSplit;
++import org.apache.hadoop.mapred.lib.NullOutputFormat;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.mapreduce.TaskAttemptID;
++import org.apache.hadoop.mapreduce.task.JobContextImpl;
++import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
++import org.apache.hadoop.security.UserGroupInformation;
++import org.apache.hadoop.security.token.Token;
++import org.apache.hadoop.security.token.TokenIdentifier;
++import org.apache.hadoop.security.token.TokenSelector;
++import org.apache.hadoop.tools.HadoopArchives;
+import org.apache.hadoop.util.Progressable;
++import org.apache.hadoop.util.ToolRunner;
+
-+/****************************************************************
-+ * A FileSystem that can serve a given scheme/authority using some
-+ * other file system. In that sense, it serves as a proxy for the
-+ * real/underlying file system
-+ *****************************************************************/
-+
-+public class ProxyFileSystem extends FilterFileSystem {
-+
-+ protected String myScheme;
-+ protected String myAuthority;
-+ protected URI myUri;
-+
-+ protected String realScheme;
-+ protected String realAuthority;
-+ protected URI realUri;
-+
-+
-+
-+ private Path swizzleParamPath(Path p) {
-+ return new Path (realScheme, realAuthority, p.toUri().getPath());
++/**
++ * Implemention of shims against Hadoop 0.23.0.
++ */
++public class Hadoop23Shims implements HadoopShims {
++ public boolean usesJobShell() {
++ return false;
+ }
+
-+ private Path swizzleReturnPath(Path p) {
-+ return new Path (myScheme, myAuthority, p.toUri().getPath());
++ public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
++ throws IOException {
++
++ return fs.deleteOnExit(path);
+ }
+
-+ private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) {
-+ FileStatus ret =
-+ new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(),
-+ orig.getBlockSize(), orig.getModificationTime(),
-+ orig.getAccessTime(), orig.getPermission(),
-+ orig.getOwner(), orig.getGroup(),
-+ isParam ? swizzleParamPath(orig.getPath()) :
-+ swizzleReturnPath(orig.getPath()));
-+ return ret;
++ public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
++ throws IOException {
++ // gone in 0.18+
+ }
+
-+ public ProxyFileSystem() {
-+ throw new RuntimeException ("Unsupported constructor");
++ public boolean isJobPreparing(RunningJob job) throws IOException {
++ return job.getJobState() == JobStatus.PREP;
+ }
-+
-+ public ProxyFileSystem(FileSystem fs) {
-+ throw new RuntimeException ("Unsupported constructor");
++ /**
++ * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
++ */
++ public void setTmpFiles(String prop, String files) {
++ // gone in 20+
++ }
++
++ public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
++ int numDataNodes,
++ boolean format,
++ String[] racks) throws IOException {
++ return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
+ }
+
+ /**
-+ * Create a proxy file system for fs.
-+ *
-+ * @param fs FileSystem to create proxy for
-+ * @param myUri URI to use as proxy. Only the scheme and authority from
-+ * this are used right now
++ * MiniDFSShim.
++ *
+ */
-+ public ProxyFileSystem(FileSystem fs, URI myUri) {
-+ super(fs);
++ public class MiniDFSShim implements HadoopShims.MiniDFSShim {
++ private final MiniDFSCluster cluster;
+
-+ URI realUri = fs.getUri();
-+ this.realScheme = realUri.getScheme();
-+ this.realAuthority=realUri.getAuthority();
-+ this.realUri = realUri;
++ public MiniDFSShim(MiniDFSCluster cluster) {
++ this.cluster = cluster;
++ }
+
-+ this.myScheme = myUri.getScheme();
-+ this.myAuthority=myUri.getAuthority();
-+ this.myUri = myUri;
-+ }
++ public FileSystem getFileSystem() throws IOException {
++ return cluster.getFileSystem();
++ }
+
-+ @Override
-+ public void initialize(URI name, Configuration conf) throws IOException {
-+ try {
-+ URI realUri = new URI (realScheme, realAuthority,
-+ name.getPath(), name.getQuery(), name.getFragment());
-+ super.initialize(realUri, conf);
-+ } catch (URISyntaxException e) {
-+ throw new RuntimeException(e);
++ public void shutdown() {
++ cluster.shutdown();
+ }
+ }
+
-+ @Override
-+ public URI getUri() {
-+ return myUri;
++ /**
++ * We define this function here to make the code compatible between
++ * hadoop 0.17 and hadoop 0.20.
++ *
++ * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
++ * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
++ * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
++ * references that class, which is not available in hadoop 0.17.
++ */
++ public int compareText(Text a, Text b) {
++ return a.compareTo(b);
+ }
+
+ @Override
-+ public String getName() {
-+ return getUri().toString();
++ public long getAccessTime(FileStatus file) {
++ return file.getAccessTime();
+ }
+
-+ @Override
-+ public Path makeQualified(Path path) {
-+ return swizzleReturnPath(super.makeQualified(swizzleParamPath(path)));
++ public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
++ return new CombineFileInputFormatShim() {
++ @Override
++ public RecordReader getRecordReader(InputSplit split,
++ JobConf job, Reporter reporter) throws IOException {
++ throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
++ }
++ };
+ }
+
++ public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
++ long shrinkedLength;
++ boolean _isShrinked;
++ public InputSplitShim() {
++ super();
++ _isShrinked = false;
++ }
+
-+ @Override
-+ protected void checkPath(Path path) {
-+ super.checkPath(swizzleParamPath(path));
-+ }
++ public InputSplitShim(CombineFileSplit old) throws IOException {
++ super(old);
++ _isShrinked = false;
++ }
+
-+ @Override
-+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
-+ long len) throws IOException {
-+ return super.getFileBlockLocations(swizzleFileStatus(file, true),
-+ start, len);
-+ }
++ @Override
++ public void shrinkSplit(long length) {
++ _isShrinked = true;
++ shrinkedLength = length;
++ }
+
-+ @Override
-+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-+ return super.open(swizzleParamPath(f), bufferSize);
-+ }
++ public boolean isShrinked() {
++ return _isShrinked;
++ }
+
-+ @Override
-+ public FSDataOutputStream append(Path f, int bufferSize,
-+ Progressable progress) throws IOException {
-+ return super.append(swizzleParamPath(f), bufferSize, progress);
-+ }
++ public long getShrinkedLength() {
++ return shrinkedLength;
++ }
+
-+ @Override
-+ public FSDataOutputStream create(Path f, FsPermission permission,
-+ boolean overwrite, int bufferSize, short replication, long blockSize,
-+ Progressable progress) throws IOException {
-+ return super.create(swizzleParamPath(f), permission,
-+ overwrite, bufferSize, replication, blockSize, progress);
-+ }
++ @Override
++ public void readFields(DataInput in) throws IOException {
++ super.readFields(in);
++ _isShrinked = in.readBoolean();
++ if (_isShrinked) {
++ shrinkedLength = in.readLong();
++ }
++ }
+
-+ @Override
-+ public boolean setReplication(Path src, short replication) throws IOException {
-+ return super.setReplication(swizzleParamPath(src), replication);
++ @Override
++ public void write(DataOutput out) throws IOException {
++ super.write(out);
++ out.writeBoolean(_isShrinked);
++ if (_isShrinked) {
++ out.writeLong(shrinkedLength);
++ }
++ }
+ }
+
-+ @Override
-+ public boolean rename(Path src, Path dst) throws IOException {
-+ return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
-+ }
-+
-+ @Override
-+ public boolean delete(Path f, boolean recursive) throws IOException {
-+ return super.delete(swizzleParamPath(f), recursive);
-+ }
++ /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
++ * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
++ * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
++ */
++ public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
+
-+ @Override
-+ public boolean deleteOnExit(Path f) throws IOException {
-+ return super.deleteOnExit(swizzleParamPath(f));
-+ }
++ static final Class[] constructorSignature = new Class[] {
++ InputSplit.class,
++ Configuration.class,
++ Reporter.class,
++ Integer.class
++ };
++
++ protected CombineFileSplit split;
++ protected JobConf jc;
++ protected Reporter reporter;
++ protected Class<RecordReader<K, V>> rrClass;
++ protected Constructor<RecordReader<K, V>> rrConstructor;
++ protected FileSystem fs;
++
++ protected int idx;
++ protected long progress;
++ protected RecordReader<K, V> curReader;
++ protected boolean isShrinked;
++ protected long shrinkedLength;
+
-+ @Override
-+ public FileStatus[] listStatus(Path f) throws IOException {
-+ FileStatus[] orig = super.listStatus(swizzleParamPath(f));
-+ FileStatus[] ret = new FileStatus [orig.length];
-+ for (int i=0; i<orig.length; i++) {
-+ ret[i] = swizzleFileStatus(orig[i], false);
++ public boolean next(K key, V value) throws IOException {
++
++ while ((curReader == null)
++ || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
++ value)) {
++ if (!initNextRecordReader(key)) {
++ return false;
++ }
++ }
++ return true;
+ }
-+ return ret;
-+ }
-+
-+ @Override
-+ public Path getHomeDirectory() {
-+ return swizzleReturnPath(super.getHomeDirectory());
-+ }
+
-+ @Override
-+ public void setWorkingDirectory(Path newDir) {
-+ super.setWorkingDirectory(swizzleParamPath(newDir));
-+ }
-+
-+ @Override
-+ public Path getWorkingDirectory() {
-+ return swizzleReturnPath(super.getWorkingDirectory());
-+ }
-+
-+ @Override
-+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-+ return super.mkdirs(swizzleParamPath(f), permission);
-+ }
++ public K createKey() {
++ K newKey = curReader.createKey();
++ return (K)(new CombineHiveKey(newKey));
++ }
+
-+ @Override
-+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
-+ throws IOException {
-+ super.copyFromLocalFile(delSrc, swizzleParamPath(src), swizzleParamPath(dst));
-+ }
++ public V createValue() {
++ return curReader.createValue();
++ }
+
-+ @Override
-+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
-+ Path[] srcs, Path dst)
-+ throws IOException {
-+ super.copyFromLocalFile(delSrc, overwrite, srcs, swizzleParamPath(dst));
-+ }
-+
-+ @Override
-+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
-+ Path src, Path dst)
-+ throws IOException {
-+ super.copyFromLocalFile(delSrc, overwrite, src, swizzleParamPath(dst));
-+ }
++ /**
++ * Return the amount of data processed.
++ */
++ public long getPos() throws IOException {
++ return progress;
++ }
+
-+ @Override
-+ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
-+ throws IOException {
-+ super.copyToLocalFile(delSrc, swizzleParamPath(src), dst);
-+ }
++ public void close() throws IOException {
++ if (curReader != null) {
++ curReader.close();
++ curReader = null;
++ }
++ }
+
-+ @Override
-+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-+ throws IOException {
-+ return super.startLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
-+ }
++ /**
++ * Return progress based on the amount of data processed so far.
++ */
++ public float getProgress() throws IOException {
++ return Math.min(1.0f, progress / (float) (split.getLength()));
++ }
+
-+ @Override
-+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-+ throws IOException {
-+ super.completeLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
-+ }
++ /**
++ * A generic RecordReader that can hand out different recordReaders
++ * for each chunk in the CombineFileSplit.
++ */
++ public CombineFileRecordReader(JobConf job, CombineFileSplit split,
++ Reporter reporter,
++ Class<RecordReader<K, V>> rrClass)
++ throws IOException {
++ this.split = split;
++ this.jc = job;
++ this.rrClass = rrClass;
++ this.reporter = reporter;
++ this.idx = 0;
++ this.curReader = null;
++ this.progress = 0;
+
-+ @Override
-+ public ContentSummary getContentSummary(Path f) throws IOException {
-+ return super.getContentSummary(swizzleParamPath(f));
-+ }
++ isShrinked = false;
+
-+ @Override
-+ public FileStatus getFileStatus(Path f) throws IOException {
-+ return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false);
-+ }
++ assert (split instanceof InputSplitShim);
++ if (((InputSplitShim) split).isShrinked()) {
++ isShrinked = true;
++ shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
++ }
+
-+ @Override
-+ public FileChecksum getFileChecksum(Path f) throws IOException {
-+ return super.getFileChecksum(swizzleParamPath(f));
-+ }
-+
-+ @Override
-+ public void setOwner(Path p, String username, String groupname
-+ ) throws IOException {
-+ super.setOwner(swizzleParamPath(p), username, groupname);
-+ }
++ try {
++ rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
++ rrConstructor.setAccessible(true);
++ } catch (Exception e) {
++ throw new RuntimeException(rrClass.getName() +
++ " does not have valid constructor", e);
++ }
++ initNextRecordReader(null);
++ }
++
++ /**
++ * do next and handle exception inside it.
++ * @param key
++ * @param value
++ * @return
++ * @throws IOException
++ */
++ private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
++ try {
++ return curReader.next(key, value);
++ } catch (Exception e) {
++ return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc);
++ }
++ }
+
-+ @Override
-+ public void setTimes(Path p, long mtime, long atime
-+ ) throws IOException {
-+ super.setTimes(swizzleParamPath(p), mtime, atime);
-+ }
++ /**
++ * Get the record reader for the next chunk in this CombineFileSplit.
++ */
++ protected boolean initNextRecordReader(K key) throws IOException {
+
-+ @Override
-+ public void setPermission(Path p, FsPermission permission
-+ ) throws IOException {
-+ super.setPermission(swizzleParamPath(p), permission);
-+ }
-+}
-+
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java
-___________________________________________________________________
-Added: svn:eol-style
- + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (revision 0)
-@@ -0,0 +1,52 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements. See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership. The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
++ if (curReader != null) {
++ curReader.close();
++ curReader = null;
++ if (idx > 0) {
++ progress += split.getLength(idx - 1); // done processing so far
++ }
++ }
+
-+package org.apache.hadoop.hive.thrift;
++ // if all chunks have been processed or reached the length, nothing more to do.
++ if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
++ return false;
++ }
+
-+import org.apache.hadoop.io.Text;
-+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
++ // get a record reader for the idx-th chunk
++ try {
++ curReader = rrConstructor.newInstance(new Object[]
++ {split, jc, reporter, Integer.valueOf(idx)});
+
-+/**
-+ * A delegation token identifier that is specific to Hive.
-+ */
-+public class DelegationTokenIdentifier
-+ extends AbstractDelegationTokenIdentifier {
-+ public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
++ // change the key if need be
++ if (key != null) {
++ K newKey = curReader.createKey();
++ ((CombineHiveKey)key).setKey(newKey);
++ }
+
-+ /**
-+ * Create an empty delegation token identifier for reading into.
-+ */
-+ public DelegationTokenIdentifier() {
++ // setup some helper config variables.
++ jc.set("map.input.file", split.getPath(idx).toString());
++ jc.setLong("map.input.start", split.getOffset(idx));
++ jc.setLong("map.input.length", split.getLength(idx));
++ } catch (Exception e) {
++ curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc);
++ }
++ idx++;
++ return true;
++ }
+ }
+
-+ /**
-+ * Create a new delegation token identifier
-+ * @param owner the effective username of the token owner
-+ * @param renewer the username of the renewer
-+ * @param realUser the real username of the token owner
-+ */
-+ public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
-+ super(owner, renewer, realUser);
-+ }
++ public abstract static class CombineFileInputFormatShim<K, V> extends
++ CombineFileInputFormat<K, V>
++ implements HadoopShims.CombineFileInputFormatShim<K, V> {
+
-+ @Override
-+ public Text getKind() {
-+ return HIVE_DELEGATION_KIND;
-+ }
++ public Path[] getInputPathsShim(JobConf conf) {
++ try {
++ return FileInputFormat.getInputPaths(conf);
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
+
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
-___________________________________________________________________
-Added: svn:eol-style
- + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (revision 0)
-@@ -0,0 +1,87 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements. See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership. The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
++ @Override
++ public void createPool(JobConf conf, PathFilter... filters) {
++ super.createPool(conf, filters);
++ }
+
-+package org.apache.hadoop.hive.thrift;
++ @Override
++ public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
++ long minSize = job.getLong("mapred.min.split.size", 0);
+
-+import java.io.IOException;
++ // For backward compatibility, let the above parameter be used
++ if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
++ super.setMinSplitSizeNode(minSize);
++ }
+
-+import org.apache.hadoop.io.Text;
-+import org.apache.hadoop.security.UserGroupInformation;
-+import org.apache.hadoop.security.token.Token;
-+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
++ if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
++ super.setMinSplitSizeRack(minSize);
++ }
+
-+/**
-+ * A Hive specific delegation token secret manager.
-+ * The secret manager is responsible for generating and accepting the password
-+ * for each token.
-+ */
-+public class DelegationTokenSecretManager
-+ extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
++ if (job.getLong("mapred.max.split.size", 0) == 0) {
++ super.setMaxSplitSize(minSize);
++ }
+
-+ /**
-+ * Create a secret manager
-+ * @param delegationKeyUpdateInterval the number of seconds for rolling new
-+ * secret keys.
-+ * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
-+ * tokens
-+ * @param delegationTokenRenewInterval how often the tokens must be renewed
-+ * @param delegationTokenRemoverScanInterval how often the tokens are scanned
-+ * for expired tokens
-+ */
-+ public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
-+ long delegationTokenMaxLifetime,
-+ long delegationTokenRenewInterval,
-+ long delegationTokenRemoverScanInterval) {
-+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
-+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
-+ }
++ CombineFileSplit[] splits = (CombineFileSplit[]) super.getSplits(job, numSplits);
+
-+ @Override
-+ public DelegationTokenIdentifier createIdentifier() {
-+ return new DelegationTokenIdentifier();
-+ }
++ InputSplitShim[] isplits = new InputSplitShim[splits.length];
++ for (int pos = 0; pos < splits.length; pos++) {
++ isplits[pos] = new InputSplitShim(splits[pos]);
++ }
+
-+ public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
-+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
-+ t.decodeFromUrlString(tokenStrForm);
-+ String user = UserGroupInformation.getCurrentUser().getUserName();
-+ cancelToken(t, user);
-+ }
++ return isplits;
++ }
+
-+ public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
-+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
-+ t.decodeFromUrlString(tokenStrForm);
-+ String user = UserGroupInformation.getCurrentUser().getUserName();
-+ return renewToken(t, user);
-+ }
++ public InputSplitShim getInputSplitShim() throws IOException {
++ return new InputSplitShim();
++ }
++
++ public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
++ Reporter reporter,
++ Class<RecordReader<K, V>> rrClass)
++ throws IOException {
++ CombineFileSplit cfSplit = (CombineFileSplit) split;
++ return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
++ }
+
-+ public synchronized String getDelegationToken(String renewer) throws IOException {
-+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-+ Text owner = new Text(ugi.getUserName());
-+ Text realUser = null;
-+ if (ugi.getRealUser() != null) {
-+ realUser = new Text(ugi.getRealUser().getUserName());
-+ }
-+ DelegationTokenIdentifier ident =
-+ new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
-+ Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
-+ ident, this);
-+ return t.encodeToUrlString();
+ }
-+}
+
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
++ public String getInputFormatClassName() {
++ return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
++ }
++
++ String[] ret = new String[2];
++
++ @Override
++ public String[] getTaskJobIDs(TaskCompletionEvent t) {
++ TaskID tid = t.getTaskAttemptId().getTaskID();
++ ret[0] = tid.toString();
++ ret[1] = tid.getJobID().toString();
++ return ret;
++ }
++
++ public void setFloatConf(Configuration conf, String varName, float val) {
++ conf.setFloat(varName, val);
++ }
++
++ @Override
++ public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
++ String archiveName) throws Exception {
++
++ HadoopArchives har = new HadoopArchives(conf);
++ List<String> args = new ArrayList<String>();
++
++ if (conf.get("hive.archive.har.parentdir.settable") == null) {
++ throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
++ }
++ boolean parentSettable =
++ conf.getBoolean("hive.archive.har.parentdir.settable", false);
++
++ if (parentSettable) {
++ args.add("-archiveName");
++ args.add(archiveName);
++ args.add("-p");
++ args.add(sourceDir.toString());
++ args.add(destDir.toString());
++ } else {
++ args.add("-archiveName");
++ args.add(archiveName);
++ args.add(sourceDir.toString());
++ args.add(destDir.toString());
++ }
++
++ return ToolRunner.run(har, args.toArray(new String[0]));
++ }
++
++ public static class NullOutputCommitter extends OutputCommitter {
++ @Override
++ public void setupJob(JobContext jobContext) { }
++ @Override
++ public void cleanupJob(JobContext jobContext) { }
++
++ @Override
++ public void setupTask(TaskAttemptContext taskContext) { }
++ @Override
++ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
++ return false;
++ }
++ @Override
++ public void commitTask(TaskAttemptContext taskContext) { }
++ @Override
++ public void abortTask(TaskAttemptContext taskContext) { }
++ }
++
++ public void setNullOutputFormat(JobConf conf) {
++ conf.setOutputFormat(NullOutputFormat.class);
++ conf.setOutputCommitter(Hadoop23Shims.NullOutputCommitter.class);
++
++ // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
++ // but can be backported. So we disable setup/cleanup in all versions >= 0.19
++ conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
++
++ // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
++ // but can be backported. So we disable setup/cleanup in all versions >= 0.19
++ conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
++ }
++
++ @Override
++ public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
++ return UserGroupInformation.getCurrentUser();
++ }
++
++ @Override
++ public boolean isSecureShimImpl() {
++ return true;
++ }
++
++ @Override
++ public String getShortUserName(UserGroupInformation ugi) {
++ return ugi.getShortUserName();
++ }
++
++ @Override
++ public String getTokenStrForm(String tokenSignature) throws IOException {
++ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
++ TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector23();
++
++ Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
++ tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
++ return token != null ? token.encodeToUrlString() : null;
++ }
++
++ @Override
++ public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception {
++ JobTrackerState state;
++ switch (clusterStatus.getJobTrackerStatus()) {
++ case INITIALIZING:
++ return JobTrackerState.INITIALIZING;
++ case RUNNING:
++ return JobTrackerState.RUNNING;
++ default:
++ String errorMsg = "Unrecognized JobTracker state: " + clusterStatus.getJobTrackerStatus();
++ throw new Exception(errorMsg);
++ }
++ }
++
++ @Override
++ public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable) {
++ return new TaskAttemptContextImpl(conf, new TaskAttemptID()) {
++ @Override
++ public void progress() {
++ progressable.progress();
++ }
++ };
++ }
++
++ @Override
++ public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job) {
++ return new JobContextImpl(job.getConfiguration(), job.getJobID());
++ }
++}
+
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
___________________________________________________________________
Added: svn:eol-style
+ native
-Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
+Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (revision 0)
-@@ -0,0 +1,33 @@
+--- shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java (revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java (revision 0)
+@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
@@ -678,48 +1169,56 @@ Index: shims/src/0.23/java/org/apache/ha
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
++package org.apache.hadoop.hive.shims;
+
-+package org.apache.hadoop.hive.thrift;
++import java.io.IOException;
+
-+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
++import org.mortbay.jetty.bio.SocketConnector;
++import org.mortbay.jetty.handler.RequestLogHandler;
++import org.mortbay.jetty.webapp.WebAppContext;
+
+/**
-+ * A delegation token that is specialized for Hive
++ * Jetty23Shims.
++ *
+ */
++public class Jetty23Shims implements JettyShims {
++ public Server startServer(String listen, int port) throws IOException {
++ Server s = new Server();
++ s.setupListenerHostPort(listen, port);
++ return s;
++ }
++
++ private static class Server extends org.mortbay.jetty.Server implements JettyShims.Server {
++ public void addWar(String war, String contextPath) {
++ WebAppContext wac = new WebAppContext();
++ wac.setContextPath(contextPath);
++ wac.setWar(war);
++ RequestLogHandler rlh = new RequestLogHandler();
++ rlh.setHandler(wac);
++ this.addHandler(rlh);
++ }
+
-+public class DelegationTokenSelector
-+ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
++ public void setupListenerHostPort(String listen, int port)
++ throws IOException {
+
-+ public DelegationTokenSelector() {
-+ super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
++ SocketConnector connector = new SocketConnector();
++ connector.setPort(port);
++ connector.setHost(listen);
++ this.addConnector(connector);
++ }
+ }
+}
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
-___________________________________________________________________
-Added: svn:eol-style
- + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java (revision 0)
-@@ -0,0 +1,4 @@
-+package org.apache.hadoop.hive.shims;
-+
-+class EmptyShim {
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
___________________________________________________________________
Added: svn:eol-style
+ native
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+Index: shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (revision 0)
-@@ -0,0 +1,546 @@
+--- shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java (revision 0)
++++ shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java (revision 0)
+@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
@@ -737,688 +1236,343 @@ Index: shims/src/0.23/java/org/apache/ha
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-+package org.apache.hadoop.hive.shims;
+
-+import java.io.DataInput;
-+import java.io.DataOutput;
-+import java.io.IOException;
-+import java.lang.reflect.Constructor;
-+import java.util.ArrayList;
-+import java.util.List;
++package org.apache.hadoop.fs;
+
-+import javax.security.auth.login.LoginException;
++import java.io.*;
++import java.net.URI;
++import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.Path;
-+import org.apache.hadoop.fs.PathFilter;
-+import org.apache.hadoop.hdfs.MiniDFSCluster;
-+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
-+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
-+import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
-+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
-+import org.apache.hadoop.io.Text;
-+import org.apache.hadoop.mapred.ClusterStatus;
-+import org.apache.hadoop.mapred.FileInputFormat;
-+import org.apache.hadoop.mapred.InputFormat;
-+import org.apache.hadoop.mapred.InputSplit;
-+import org.apache.hadoop.mapred.JobConf;
-+import org.apache.hadoop.mapred.JobContext;
-+import org.apache.hadoop.mapred.JobStatus;
-+import org.apache.hadoop.mapred.OutputCommitter;
-+import org.apache.hadoop.mapred.RecordReader;
-+import org.apache.hadoop.mapred.Reporter;
-+import org.apache.hadoop.mapred.RunningJob;
-+import org.apache.hadoop.mapred.TaskAttemptContext;
-+import org.apache.hadoop.mapred.TaskCompletionEvent;
-+import org.apache.hadoop.mapred.TaskID;
-+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
-+import org.apache.hadoop.mapred.lib.CombineFileSplit;
-+import org.apache.hadoop.mapred.lib.NullOutputFormat;
-+import org.apache.hadoop.mapreduce.Job;
-+import org.apache.hadoop.mapreduce.TaskAttemptID;
-+import org.apache.hadoop.mapreduce.task.JobContextImpl;
-+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-+import org.apache.hadoop.security.UserGroupInformation;
-+import org.apache.hadoop.security.token.Token;
-+import org.apache.hadoop.security.token.TokenIdentifier;
-+import org.apache.hadoop.security.token.TokenSelector;
-+import org.apache.hadoop.tools.HadoopArchives;
-+import org.apache.hadoop.util.Progressable;
-+import org.apache.hadoop.util.ToolRunner;
-+
-+/**
-+ * Implemention of shims against Hadoop 0.23.0.
-+ */
-+public class Hadoop23Shims implements HadoopShims {
-+ public boolean usesJobShell() {
-+ return false;
-+ }
-+
-+ public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
-+ throws IOException {
-+
-+ return fs.deleteOnExit(path);
-+ }
-+
-+ public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
-+ throws IOException {
-+ // gone in 0.18+
-+ }
-+
-+ public boolean isJobPreparing(RunningJob job) throws IOException {
-+ return job.getJobState() == JobStatus.PREP;
-+ }
-+ /**
-+ * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
-+ */
-+ public void setTmpFiles(String prop, String files) {
-+ // gone in 20+
-+ }
-+
-+ public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
-+ int numDataNodes,
-+ boolean format,
-+ String[] racks) throws IOException {
-+ return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
-+ }
-+
-+ /**
-+ * MiniDFSShim.
-+ *
-+ */
-+ public class MiniDFSShim implements HadoopShims.MiniDFSShim {
-+ private final MiniDFSCluster cluster;
-+
-+ public MiniDFSShim(MiniDFSCluster cluster) {
-+ this.cluster = cluster;
-+ }
-+
-+ public FileSystem getFileSystem() throws IOException {
-+ return cluster.getFileSystem();
-+ }
-+
-+ public void shutdown() {
-+ cluster.shutdown();
-+ }
-+ }
-+
-+ /**
-+ * We define this function here to make the code compatible between
-+ * hadoop 0.17 and hadoop 0.20.
-+ *
-+ * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
-+ * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
-+ * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
-+ * references that class, which is not available in hadoop 0.17.
-+ */
-+ public int compareText(Text a, Text b) {
-+ return a.compareTo(b);
-+ }
-+
-+ @Override
-+ public long getAccessTime(FileStatus file) {
-+ return file.getAccessTime();
-+ }
-+
-+ public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
-+ return new CombineFileInputFormatShim() {
-+ @Override
-+ public RecordReader getRecordReader(InputSplit split,
-+ JobConf job, Reporter reporter) throws IOException {
-+ throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
-+ }
-+ };
-+ }
-+
-+ public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
-+ long shrinkedLength;
-+ boolean _isShrinked;
-+ public InputSplitShim() {
-+ super();
-+ _isShrinked = false;
-+ }
-+
-+ public InputSplitShim(CombineFileSplit old) throws IOException {
-+ super(old);
-+ _isShrinked = false;
-+ }
-+
-+ @Override
-+ public void shrinkSplit(long length) {
-+ _isShrinked = true;
-+ shrinkedLength = length;
-+ }
-+
-+ public boolean isShrinked() {
-+ return _isShrinked;
-+ }
-+
-+ public long getShrinkedLength() {
-+ return shrinkedLength;
-+ }
-+
-+ @Override
-+ public void readFields(DataInput in) throws IOException {
-+ super.readFields(in);
-+ _isShrinked = in.readBoolean();
-+ if (_isShrinked) {
-+ shrinkedLength = in.readLong();
-+ }
-+ }
-+
-+ @Override
-+ public void write(DataOutput out) throws IOException {
-+ super.write(out);
-+ out.writeBoolean(_isShrinked);
-+ if (_isShrinked) {
-+ out.writeLong(shrinkedLength);
-+ }
-+ }
-+ }
-+
-+ /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
-+ * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
-+ * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
-+ */
-+ public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
-+
-+ static final Class[] constructorSignature = new Class[] {
-+ InputSplit.class,
-+ Configuration.class,
-+ Reporter.class,
-+ Integer.class
-+ };
-+
-+ protected CombineFileSplit split;
-+ protected JobConf jc;
-+ protected Reporter reporter;
-+ protected Class<RecordReader<K, V>> rrClass;
-+ protected Constructor<RecordReader<K, V>> rrConstructor;
-+ protected FileSystem fs;
-+
-+ protected int idx;
-+ protected long progress;
-+ protected RecordReader<K, V> curReader;
-+ protected boolean isShrinked;
-+ protected long shrinkedLength;
-+
-+ public boolean next(K key, V value) throws IOException {
-+
-+ while ((curReader == null)
-+ || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
-+ value)) {
-+ if (!initNextRecordReader(key)) {
-+ return false;
-+ }
-+ }
-+ return true;
-+ }
-+
-+ public K createKey() {
-+ K newKey = curReader.createKey();
-+ return (K)(new CombineHiveKey(newKey));
-+ }
-+
-+ public V createValue() {
-+ return curReader.createValue();
-+ }
-+
-+ /**
-+ * Return the amount of data processed.
-+ */
-+ public long getPos() throws IOException {
-+ return progress;
-+ }
-+
-+ public void close() throws IOException {
-+ if (curReader != null) {
-+ curReader.close();
-+ curReader = null;
-+ }
-+ }
-+
-+ /**
-+ * Return progress based on the amount of data processed so far.
-+ */
-+ public float getProgress() throws IOException {
-+ return Math.min(1.0f, progress / (float) (split.getLength()));
-+ }
-+
-+ /**
-+ * A generic RecordReader that can hand out different recordReaders
-+ * for each chunk in the CombineFileSplit.
-+ */
-+ public CombineFileRecordReader(JobConf job, CombineFileSplit split,
-+ Reporter reporter,
-+ Class<RecordReader<K, V>> rrClass)
-+ throws IOException {
-+ this.split = split;
-+ this.jc = job;
-+ this.rrClass = rrClass;
-+ this.reporter = reporter;
-+ this.idx = 0;
-+ this.curReader = null;
-+ this.progress = 0;
-+
-+ isShrinked = false;
-+
-+ assert (split instanceof InputSplitShim);
-+ if (((InputSplitShim) split).isShrinked()) {
-+ isShrinked = true;
-+ shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
-+ }
-+
-+ try {
-+ rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
-+ rrConstructor.setAccessible(true);
-+ } catch (Exception e) {
-+ throw new RuntimeException(rrClass.getName() +
-+ " does not have valid constructor", e);
-+ }
-+ initNextRecordReader(null);
-+ }
-+
-+ /**
-+ * do next and handle exception inside it.
-+ * @param key
-+ * @param value
-+ * @return
-+ * @throws IOException
-+ */
-+ private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
-+ try {
-+ return curReader.next(key, value);
-+ } catch (Exception e) {
-+ return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc);
-+ }
-+ }
-+
-+ /**
-+ * Get the record reader for the next chunk in this CombineFileSplit.
-+ */
-+ protected boolean initNextRecordReader(K key) throws IOException {
-+
-+ if (curReader != null) {
-+ curReader.close();
-+ curReader = null;
-+ if (idx > 0) {
-+ progress += split.getLength(idx - 1); // done processing so far
-+ }
-+ }
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.util.Progressable;
+
-+ // if all chunks have been processed or reached the length, nothing more to do.
-+ if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
-+ return false;
-+ }
++/****************************************************************
++ * A Proxy for LocalFileSystem
++ *
++ * Serves uri's corresponding to 'pfile:///' namespace with using
++ * a LocalFileSystem
++ *****************************************************************/
+
-+ // get a record reader for the idx-th chunk
-+ try {
-+ curReader = rrConstructor.newInstance(new Object[]
-+ {split, jc, reporter, Integer.valueOf(idx)});
++public class ProxyLocalFileSystem extends FilterFileSystem {
+
-+ // change the key if need be
-+ if (key != null) {
-+ K newKey = curReader.createKey();
-+ ((CombineHiveKey)key).setKey(newKey);
-+ }
++ protected LocalFileSystem localFs;
+
-+ // setup some helper config variables.
-+ jc.set("map.input.file", split.getPath(idx).toString());
-+ jc.setLong("map.input.start", split.getOffset(idx));
-+ jc.setLong("map.input.length", split.getLength(idx));
-+ } catch (Exception e) {
-+ curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc);
-+ }
-+ idx++;
-+ return true;
-+ }
++ public ProxyLocalFileSystem() {
++ localFs = new LocalFileSystem();
+ }
+
-+ public abstract static class CombineFileInputFormatShim<K, V> extends
-+ CombineFileInputFormat<K, V>
-+ implements HadoopShims.CombineFileInputFormatShim<K, V> {
++ public ProxyLocalFileSystem(FileSystem fs) {
++ throw new RuntimeException ("Unsupported Constructor");
++ }
+
-+ public Path[] getInputPathsShim(JobConf conf) {
-+ try {
-+ return FileInputFormat.getInputPaths(conf);
-+ } catch (Exception e) {
-+ throw new RuntimeException(e);
-+ }
-+ }
++ @Override
++ public void initialize(URI name, Configuration conf) throws IOException {
++ // create a proxy for the local filesystem
++ // the scheme/authority serving as the proxy is derived
++ // from the supplied URI
+
-+ @Override
-+ public void createPool(JobConf conf, PathFilter... filters) {
-+ super.createPool(conf, filters);
-+ }
++ String scheme = name.getScheme();
++ String authority = name.getAuthority() != null ? name.getAuthority() : "";
++ String proxyUriString = name + "://" + authority + "/";
++ fs = new ProxyFileSystem(localFs, URI.create(proxyUriString));
+
-+ @Override
-+ public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
-+ long minSize = job.getLong("mapred.min.split.size", 0);
++ fs.initialize(name, conf);
++ }
++}
+
+Property changes on: shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+___________________________________________________________________
+Added: svn:eol-style
+ + native
+
+Index: shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java
+===================================================================
+--- shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java (revision 0)
++++ shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java (revision 0)
+@@ -0,0 +1,273 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+
-+ // For backward compatibility, let the above parameter be used
-+ if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
-+ super.setMinSplitSizeNode(minSize);
-+ }
++package org.apache.hadoop.fs;
+
-+ if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
-+ super.setMinSplitSizeRack(minSize);
-+ }
++import java.io.IOException;
++import java.net.URI;
++import java.net.URISyntaxException;
+
-+ if (job.getLong("mapred.max.split.size", 0) == 0) {
-+ super.setMaxSplitSize(minSize);
-+ }
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.util.Progressable;
+
-+ CombineFileSplit[] splits = (CombineFileSplit[]) super.getSplits(job, numSplits);
++/****************************************************************
++ * A FileSystem that can serve a given scheme/authority using some
++ * other file system. In that sense, it serves as a proxy for the
++ * real/underlying file system
++ *****************************************************************/
+
-+ InputSplitShim[] isplits = new InputSplitShim[splits.length];
-+ for (int pos = 0; pos < splits.length; pos++) {
-+ isplits[pos] = new InputSplitShim(splits[pos]);
-+ }
++public class ProxyFileSystem extends FilterFileSystem {
+
-+ return isplits;
-+ }
++ protected String myScheme;
++ protected String myAuthority;
++ protected URI myUri;
+
-+ public InputSplitShim getInputSplitShim() throws IOException {
-+ return new InputSplitShim();
-+ }
++ protected String realScheme;
++ protected String realAuthority;
++ protected URI realUri;
+
-+ public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
-+ Reporter reporter,
-+ Class<RecordReader<K, V>> rrClass)
-+ throws IOException {
-+ CombineFileSplit cfSplit = (CombineFileSplit) split;
-+ return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
-+ }
++
+
++ private Path swizzleParamPath(Path p) {
++ return new Path (realScheme, realAuthority, p.toUri().getPath());
+ }
+
-+ public String getInputFormatClassName() {
-+ return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
++ private Path swizzleReturnPath(Path p) {
++ return new Path (myScheme, myAuthority, p.toUri().getPath());
+ }
+
-+ String[] ret = new String[2];
-+
-+ @Override
-+ public String[] getTaskJobIDs(TaskCompletionEvent t) {
-+ TaskID tid = t.getTaskAttemptId().getTaskID();
-+ ret[0] = tid.toString();
-+ ret[1] = tid.getJobID().toString();
++ private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) {
++ FileStatus ret =
++ new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(),
++ orig.getBlockSize(), orig.getModificationTime(),
++ orig.getAccessTime(), orig.getPermission(),
++ orig.getOwner(), orig.getGroup(),
++ isParam ? swizzleParamPath(orig.getPath()) :
++ swizzleReturnPath(orig.getPath()));
+ return ret;
+ }
+
-+ public void setFloatConf(Configuration conf, String varName, float val) {
-+ conf.setFloat(varName, val);
++ public ProxyFileSystem() {
++ throw new RuntimeException ("Unsupported constructor");
++ }
++
++ public ProxyFileSystem(FileSystem fs) {
++ throw new RuntimeException ("Unsupported constructor");
+ }
+
-+ @Override
-+ public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
-+ String archiveName) throws Exception {
++ /**
++ * Create a proxy file system for fs.
++ *
++ * @param fs FileSystem to create proxy for
++ * @param myUri URI to use as proxy. Only the scheme and authority from
++ * this are used right now
++ */
++ public ProxyFileSystem(FileSystem fs, URI myUri) {
++ super(fs);
+
-+ HadoopArchives har = new HadoopArchives(conf);
-+ List<String> args = new ArrayList<String>();
++ URI realUri = fs.getUri();
++ this.realScheme = realUri.getScheme();
++ this.realAuthority=realUri.getAuthority();
++ this.realUri = realUri;
+
-+ if (conf.get("hive.archive.har.parentdir.settable") == null) {
-+ throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
-+ }
-+ boolean parentSettable =
-+ conf.getBoolean("hive.archive.har.parentdir.settable", false);
++ this.myScheme = myUri.getScheme();
++ this.myAuthority=myUri.getAuthority();
++ this.myUri = myUri;
++ }
+
-+ if (parentSettable) {
-+ args.add("-archiveName");
-+ args.add(archiveName);
-+ args.add("-p");
-+ args.add(sourceDir.toString());
-+ args.add(destDir.toString());
-+ } else {
-+ args.add("-archiveName");
-+ args.add(archiveName);
-+ args.add(sourceDir.toString());
-+ args.add(destDir.toString());
++ @Override
++ public void initialize(URI name, Configuration conf) throws IOException {
++ try {
++ URI realUri = new URI (realScheme, realAuthority,
++ name.getPath(), name.getQuery(), name.getFragment());
++ super.initialize(realUri, conf);
++ } catch (URISyntaxException e) {
++ throw new RuntimeException(e);
+ }
++ }
+
-+ return ToolRunner.run(har, args.toArray(new String[0]));
++ @Override
++ public URI getUri() {
++ return myUri;
+ }
+
-+ public static class NullOutputCommitter extends OutputCommitter {
-+ @Override
-+ public void setupJob(JobContext jobContext) { }
-+ @Override
-+ public void cleanupJob(JobContext jobContext) { }
++ @Override
++ public String getName() {
++ return getUri().toString();
++ }
+
-+ @Override
-+ public void setupTask(TaskAttemptContext taskContext) { }
-+ @Override
-+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-+ return false;
-+ }
-+ @Override
-+ public void commitTask(TaskAttemptContext taskContext) { }
-+ @Override
-+ public void abortTask(TaskAttemptContext taskContext) { }
++ @Override
++ public Path makeQualified(Path path) {
++ return swizzleReturnPath(super.makeQualified(swizzleParamPath(path)));
+ }
+
-+ public void setNullOutputFormat(JobConf conf) {
-+ conf.setOutputFormat(NullOutputFormat.class);
-+ conf.setOutputCommitter(Hadoop23Shims.NullOutputCommitter.class);
+
-+ // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
-+ // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-+ conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
++ @Override
++ protected void checkPath(Path path) {
++ super.checkPath(swizzleParamPath(path));
++ }
+
-+ // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
-+ // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-+ conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
++ @Override
++ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
++ long len) throws IOException {
++ return super.getFileBlockLocations(swizzleFileStatus(file, true),
++ start, len);
+ }
+
+ @Override
-+ public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
-+ return UserGroupInformation.getCurrentUser();
++ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
++ return super.open(swizzleParamPath(f), bufferSize);
+ }
+
+ @Override
-+ public boolean isSecureShimImpl() {
-+ return true;
++ public FSDataOutputStream append(Path f, int bufferSize,
++ Progressable progress) throws IOException {
++ return super.append(swizzleParamPath(f), bufferSize, progress);
+ }
+
+ @Override
-+ public String getShortUserName(UserGroupInformation ugi) {
-+ return ugi.getShortUserName();
++ public FSDataOutputStream create(Path f, FsPermission permission,
++ boolean overwrite, int bufferSize, short replication, long blockSize,
++ Progressable progress) throws IOException {
++ return super.create(swizzleParamPath(f), permission,
++ overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
-+ public String getTokenStrForm(String tokenSignature) throws IOException {
-+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-+ TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
++ public boolean setReplication(Path src, short replication) throws IOException {
++ return super.setReplication(swizzleParamPath(src), replication);
++ }
+
-+ Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
-+ tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
-+ return token != null ? token.encodeToUrlString() : null;
++ @Override
++ public boolean rename(Path src, Path dst) throws IOException {
++ return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
+ }
+
+ @Override
-+ public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception {
-+ JobTrackerState state;
-+ switch (clusterStatus.getJobTrackerStatus()) {
-+ case INITIALIZING:
-+ return JobTrackerState.INITIALIZING;
-+ case RUNNING:
-+ return JobTrackerState.RUNNING;
-+ default:
-+ String errorMsg = "Unrecognized JobTracker state: " + clusterStatus.getJobTrackerStatus();
-+ throw new Exception(errorMsg);
++ public boolean delete(Path f, boolean recursive) throws IOException {
++ return super.delete(swizzleParamPath(f), recursive);
++ }
++
++ @Override
++ public boolean deleteOnExit(Path f) throws IOException {
++ return super.deleteOnExit(swizzleParamPath(f));
++ }
++
++ @Override
++ public FileStatus[] listStatus(Path f) throws IOException {
++ FileStatus[] orig = super.listStatus(swizzleParamPath(f));
++ FileStatus[] ret = new FileStatus [orig.length];
++ for (int i=0; i<orig.length; i++) {
++ ret[i] = swizzleFileStatus(orig[i], false);
+ }
++ return ret;
+ }
+
+ @Override
-+ public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable) {
-+ return new TaskAttemptContextImpl(conf, new TaskAttemptID()) {
-+ @Override
-+ public void progress() {
-+ progressable.progress();
-+ }
-+ };
++ public Path getHomeDirectory() {
++ return swizzleReturnPath(super.getHomeDirectory());
+ }
+
+ @Override
-+ public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job) {
-+ return new JobContextImpl(job.getConfiguration(), job.getJobID());
++ public void setWorkingDirectory(Path newDir) {
++ super.setWorkingDirectory(swizzleParamPath(newDir));
+ }
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
-___________________________________________________________________
-Added: svn:eol-style
- + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java (revision 0)
-@@ -0,0 +1,56 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements. See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership. The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.hadoop.hive.shims;
-+
-+import java.io.IOException;
-+
-+import org.mortbay.jetty.bio.SocketConnector;
-+import org.mortbay.jetty.handler.RequestLogHandler;
-+import org.mortbay.jetty.webapp.WebAppContext;
-+
-+/**
-+ * Jetty23Shims.
-+ *
-+ */
-+public class Jetty23Shims implements JettyShims {
-+ public Server startServer(String listen, int port) throws IOException {
-+ Server s = new Server();
-+ s.setupListenerHostPort(listen, port);
-+ return s;
++
++ @Override
++ public Path getWorkingDirectory() {
++ return swizzleReturnPath(super.getWorkingDirectory());
++ }
++
++ @Override
++ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
++ return super.mkdirs(swizzleParamPath(f), permission);
+ }
+
-+ private static class Server extends org.mortbay.jetty.Server implements JettyShims.Server {
-+ public void addWar(String war, String contextPath) {
-+ WebAppContext wac = new WebAppContext();
-+ wac.setContextPath(contextPath);
-+ wac.setWar(war);
-+ RequestLogHandler rlh = new RequestLogHandler();
-+ rlh.setHandler(wac);
-+ this.addHandler(rlh);
-+ }
++ @Override
++ public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
++ throws IOException {
++ super.copyFromLocalFile(delSrc, swizzleParamPath(src), swizzleParamPath(dst));
++ }
+
-+ public void setupListenerHostPort(String listen, int port)
-+ throws IOException {
++ @Override
++ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
++ Path[] srcs, Path dst)
++ throws IOException {
++ super.copyFromLocalFile(delSrc, overwrite, srcs, swizzleParamPath(dst));
++ }
++
++ @Override
++ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
++ Path src, Path dst)
++ throws IOException {
++ super.copyFromLocalFile(delSrc, overwrite, src, swizzleParamPath(dst));
++ }
+
-+ SocketConnector connector = new SocketConnector();
-+ connector.setPort(port);
-+ connector.setHost(listen);
-+ this.addConnector(connector);
-+ }
++ @Override
++ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
++ throws IOException {
++ super.copyToLocalFile(delSrc, swizzleParamPath(src), dst);
+ }
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
-___________________________________________________________________
-Added: svn:eol-style
- + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java (revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java (revision 0)
-@@ -0,0 +1,66 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements. See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership. The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
+
-+package org.apache.hadoop.hive.shims;
++ @Override
++ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
++ throws IOException {
++ return super.startLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
++ }
+
-+import java.io.IOException;
++ @Override
++ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
++ throws IOException {
++ super.completeLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
++ }
+
-+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.ContentSummary;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.HarFileSystem;
-+import org.apache.hadoop.fs.Path;
++ @Override
++ public ContentSummary getContentSummary(Path f) throws IOException {
++ return super.getContentSummary(swizzleParamPath(f));
++ }
+
-+/**
-+ * HiveHarFileSystem - fixes issues with Hadoop's HarFileSystem
-+ *
-+ */
-+public class HiveHarFileSystem extends HarFileSystem {
++ @Override
++ public FileStatus getFileStatus(Path f) throws IOException {
++ return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false);
++ }
+
+ @Override
-+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
-+ long len) throws IOException {
++ public FileChecksum getFileChecksum(Path f) throws IOException {
++ return super.getFileChecksum(swizzleParamPath(f));
++ }
++
++ @Override
++ public void setOwner(Path p, String username, String groupname
++ ) throws IOException {
++ super.setOwner(swizzleParamPath(p), username, groupname);
++ }
+
-+ // In some places (e.g. FileInputFormat) this BlockLocation is used to
-+ // figure out sizes/offsets and so a completely blank one will not work.
-+ String [] hosts = {"DUMMY_HOST"};
-+ return new BlockLocation[]{new BlockLocation(null, hosts, 0, file.getLen())};
++ @Override
++ public void setTimes(Path p, long mtime, long atime
++ ) throws IOException {
++ super.setTimes(swizzleParamPath(p), mtime, atime);
+ }
+
+ @Override
-+ public ContentSummary getContentSummary(Path f) throws IOException {
-+ // HarFileSystem has a bug where this method does not work properly
-+ // if the underlying FS is HDFS. See MAPREDUCE-1877 for more
-+ // information. This method is from FileSystem.
-+ FileStatus status = getFileStatus(f);
-+ if (!status.isDir()) {
-+ // f is a file
-+ return new ContentSummary(status.getLen(), 1, 0);
-+ }
-+ // f is a directory
-+ long[] summary = {0, 0, 1};
-+ for(FileStatus s : listStatus(f)) {
-+ ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :
-+ new ContentSummary(s.getLen(), 1, 0);
-+ summary[0] += c.getLength();
-+ summary[1] += c.getFileCount();
-+ summary[2] += c.getDirectoryCount();
-+ }
-+ return new ContentSummary(summary[0], summary[1], summary[2]);
++ public void setPermission(Path p, FsPermission permission
++ ) throws IOException {
++ super.setPermission(swizzleParamPath(p), permission);
+ }
+}
++
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java
+Property changes on: shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java
[... 398 lines stripped ...]