You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2013/03/15 20:45:05 UTC

git commit: Added async support to client.

Updated Branches:
  refs/heads/master fe50e9b2f -> 511fae36d


Added async support to client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-knox/commit/511fae36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-knox/tree/511fae36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-knox/diff/511fae36

Branch: refs/heads/master
Commit: 511fae36d5779062b26aaa0b497df17813e6a56f
Parents: fe50e9b
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Fri Mar 15 15:45:00 2013 -0400
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Fri Mar 15 15:45:00 2013 -0400

----------------------------------------------------------------------
 .../hadoop/gateway/shell/AbstractRequest.java      |   16 +++-
 .../hadoop/gateway/shell/AbstractResponse.java     |    4 +
 .../org/apache/hadoop/gateway/shell/Hadoop.java    |   50 ++---------
 .../hadoop/gateway/shell/hdfs/Example.groovy       |    3 +
 .../org/apache/hadoop/gateway/shell/hdfs/Get.java  |   29 ++++---
 .../org/apache/hadoop/gateway/shell/hdfs/Ls.java   |   21 +++--
 .../apache/hadoop/gateway/shell/hdfs/Mkdir.java    |   20 +++--
 .../org/apache/hadoop/gateway/shell/hdfs/Put.java  |   65 ++++++++-------
 .../org/apache/hadoop/gateway/shell/hdfs/Rm.java   |   21 +++--
 .../org/apache/hadoop/gateway/shell/job/Hive.java  |   27 ++++---
 .../org/apache/hadoop/gateway/shell/job/Java.java  |   30 ++++---
 .../org/apache/hadoop/gateway/shell/job/Pig.java   |   28 ++++---
 .../org/apache/hadoop/gateway/shell/job/Queue.java |   14 ++-
 .../apache/hadoop/gateway/shell/job/Status.java    |   14 ++-
 .../hadoop/gateway/shell/workflow/Status.java      |   16 ++--
 .../hadoop/gateway/shell/workflow/Submit.java      |   30 ++++---
 16 files changed, 219 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
index f850e39..1e5eeee 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
@@ -10,6 +10,8 @@ import org.apache.http.message.BasicNameValuePair;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -28,7 +30,7 @@ import java.util.List;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-public abstract class AbstractRequest {
+public abstract class AbstractRequest<T> {
 
   private Hadoop hadoop;
 
@@ -41,7 +43,7 @@ public abstract class AbstractRequest {
   }
 
   protected HttpResponse execute( HttpRequest request ) throws IOException {
-    return hadoop.execute( request );
+    return hadoop.executeNow( request );
   }
 
   protected URIBuilder uri( String... parts ) throws URISyntaxException {
@@ -60,4 +62,14 @@ public abstract class AbstractRequest {
     }
   }
 
+  abstract protected Callable<T> callable();
+
+  public T now() throws Exception, URISyntaxException {
+    return callable().call();
+  }
+
+  public Future<T> later() {
+    return hadoop().executeLater( callable() );
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
index 1028fff..4dde1b2 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
@@ -55,6 +55,10 @@ public abstract class AbstractResponse {
     return consumed;
   }
 
+  public int getStatusCode() {
+    return response.getStatusLine().getStatusCode();
+  }
+
   public long getContentLength() {
     return response.getEntity().getContentLength();
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
index 47baf8c..4fcd3c9 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
@@ -39,6 +39,10 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.GeneralSecurityException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class Hadoop {
 
@@ -48,12 +52,14 @@ public class Hadoop {
   BasicHttpContext context;
   String username;
   String password;
+  ExecutorService executor;
 
   public static Hadoop login( String url, String username, String password ) throws URISyntaxException {
     return new Hadoop( url, username, password );
   }
 
   private Hadoop( String url, String username, String password ) throws HadoopException, URISyntaxException {
+    this.executor = Executors.newCachedThreadPool();
     this.base = url;
     this.username = username;
     this.password = password;
@@ -90,48 +96,12 @@ public class Hadoop {
     return base;
   }
 
-  public HttpResponse execute( HttpRequest request ) throws IOException {
+  public HttpResponse executeNow( HttpRequest request ) throws IOException {
     return client.execute( host, request, context );
   }
 
-//  SSLContext ctx = SSLContext.getInstance( "TLS" );
-//  KeyManager[] keyManagers = createKeyManagers( "jks", "target/test-classes/client-keystore.jks", "horton" );
-//  TrustManager[] trustManagers = createTrustManagers( "jks", "target/test-classes/client-truststore.jks", "horton" );
-//  ctx.init( keyManagers, trustManagers, new SecureRandom() );
-//
-//  SSLSocketFactory socketFactory = new SSLSocketFactory( ctx, SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER );
-//
-//  SchemeRegistry schemes = new SchemeRegistry();
-//  schemes.register( new Scheme( "https", port, socketFactory ) );
-//  ClientConnectionManager cm = new BasicClientConnectionManager( schemes );
-//
-//  HttpClient client = new DefaultHttpClient( cm );
-//
-//  HttpGet get = new HttpGet( url );
-//  ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-//  client.execute( get ).getEntity().writeTo( buffer );
-//  assertThat( buffer.toString(), equalTo( "<html>Hello!</html>" ) );
-//}
-//
-//  private static KeyManager[] createKeyManagers( String keyStoreType, String keyStorePath, String keyStorePassword ) throws Exception {
-//    KeyStore keyStore = loadKeyStore( keyStoreType, keyStorePath, keyStorePassword );
-//    KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
-//    kmf.init( keyStore, keyStorePassword.toCharArray() );
-//    return kmf.getKeyManagers();
-//  }
-//
-//  private static TrustManager[] createTrustManagers( String trustStoreType, String trustStorePath, String trustStorePassword ) throws Exception {
-//    KeyStore trustStore = loadKeyStore( trustStoreType, trustStorePath, trustStorePassword );
-//    TrustManagerFactory tmf = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm() );
-//    tmf.init( trustStore );
-//    return tmf.getTrustManagers();
-//  }
-//
-//  private static KeyStore loadKeyStore( String type, String path, String password ) throws IOException, NoSuchAlgorithmException, CertificateException, KeyStoreException {
-//    KeyStore keyStore = KeyStore.getInstance( type );
-//    InputStream keystoreInput = new FileInputStream( path );
-//    keyStore.load( keystoreInput, password.toCharArray() );
-//    return keyStore;
-//  }
+  public <T> Future<T> executeLater( Callable<T> callable ) {
+    return executor.submit( callable );
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
index f6ec4c4..4d39b93 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
@@ -38,3 +38,6 @@ hdfs.mkdir(hadoop).dir( "/tmp/test").now()
 hdfs.put(hadoop).file( inputFile ).to( "/tmp/test/input/LICENSE" ).now()
 
 hdfs.get(hadoop).file( "/Users/kevin.minder/Projects/gateway-0.2.0-SNAPSHOT/OUTPUT" ).from( "/tmp/test/input/LICENSE" ).now()
+
+future = hdfs.put(hadoop).file( inputFile ).to( "/tmp/test/input/LICENSE2" ).later()
+println future.get().statusCode

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
index 1413ebd..e9d3c41 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
@@ -21,25 +21,20 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.gateway.shell.AbstractRequest;
 import org.apache.hadoop.gateway.shell.AbstractResponse;
 import org.apache.hadoop.gateway.shell.Hadoop;
-import org.apache.hadoop.gateway.shell.HadoopException;
-import org.apache.http.Header;
 import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.util.EntityUtils;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 public class Get {
 
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
-    String from;
-    String to;
+    private String from;
+    private String to;
 
     Request( Hadoop hadoop ) {
       super( hadoop );
@@ -55,11 +50,17 @@ public class Get {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Hdfs.SERVICE_PATH, from );
-      addQueryParam( uri, "op", "OPEN" );
-      HttpGet request = new HttpGet( uri.build() );
-      return new Response( execute( request ), to );
+
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Hdfs.SERVICE_PATH, from );
+          addQueryParam( uri, "op", "OPEN" );
+          HttpGet request = new HttpGet( uri.build() );
+          return new Response( execute( request ), to );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
index 25688ac..9e1ce27 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
@@ -23,14 +23,12 @@ import org.apache.hadoop.gateway.shell.Hadoop;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Ls {
 
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
     String dir;
 
@@ -43,11 +41,16 @@ class Ls {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
-      addQueryParam( uri, "op", "LISTSTATUS" );
-      HttpGet get = new HttpGet( uri.build() );
-      return new Response( execute( get ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
+          addQueryParam( uri, "op", "LISTSTATUS" );
+          HttpGet get = new HttpGet( uri.build() );
+          return new Response( execute( get ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
index 2212d9b..0f7f1ba 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
@@ -26,10 +26,11 @@ import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Mkdir {
 
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
     String dir = null;
     String perm = null;
@@ -48,12 +49,17 @@ class Mkdir {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
-      addQueryParam( uri, "op", "MKDIRS" );
-      addQueryParam( uri, "permissions", perm );
-      HttpPut request = new HttpPut( uri.build() );
-      return new Response( execute( request ) );
+    public Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
+          addQueryParam( uri, "op", "MKDIRS" );
+          addQueryParam( uri, "permissions", perm );
+          HttpPut request = new HttpPut( uri.build() );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
index 08fc877..9510a64 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
@@ -32,16 +32,18 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.util.EntityUtils;
 
 import java.io.File;
-import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 
 class Put {
 
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
-    String text;
-    String from;
-    String to;
+    private String text;
+    private String file;
+    private String to;
 
     Request( Hadoop hadoop ) {
       super( hadoop );
@@ -53,7 +55,7 @@ class Put {
     }
 
     public Request file( String file ) {
-      this.from = file;
+      this.file = file;
       return this;
     }
 
@@ -62,29 +64,34 @@ class Put {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Hdfs.SERVICE_PATH, to );
-      addQueryParam( uri, "op", "CREATE" );
-      HttpPut nn = new HttpPut( uri.build() );
-      HttpResponse r = execute( nn );
-      if( r.getStatusLine().getStatusCode() != HttpStatus.SC_TEMPORARY_REDIRECT ) {
-        throw new HadoopException( r.getStatusLine().toString() );
-      }
-      EntityUtils.consumeQuietly( r.getEntity() );
-      Header[] h = r.getHeaders( "Location" );
-      if( h == null || h.length != 1 ) {
-        throw new HadoopException( "Invalid Location header." );
-      }
-      String loc = h[0].getValue();
-      HttpPut dn = new HttpPut( loc );
-      HttpEntity e = null;
-      if( text != null ) {
-        e = new StringEntity( text );
-      } else if( from != null ) {
-        e = new FileEntity( new File( from ) );
-      }
-      dn.setEntity( e );
-      return new Response( execute( dn ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Hdfs.SERVICE_PATH, to );
+          addQueryParam( uri, "op", "CREATE" );
+          HttpPut nn = new HttpPut( uri.build() );
+          HttpResponse r = execute( nn );
+          if( r.getStatusLine().getStatusCode() != HttpStatus.SC_TEMPORARY_REDIRECT ) {
+            throw new HadoopException( r.getStatusLine().toString() );
+          }
+          EntityUtils.consumeQuietly( r.getEntity() );
+          Header[] h = r.getHeaders( "Location" );
+          if( h == null || h.length != 1 ) {
+            throw new HadoopException( "Invalid Location header." );
+          }
+          String loc = h[0].getValue();
+          HttpPut dn = new HttpPut( loc );
+          HttpEntity e = null;
+          if( text != null ) {
+            e = new StringEntity( text );
+          } else if( file != null ) {
+            e = new FileEntity( new File( file ) );
+          }
+          dn.setEntity( e );
+          return new Response( execute( dn ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
index fb2b2f8..955eb9f 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
@@ -25,11 +25,11 @@ import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Rm {
 
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
     String file;
     Boolean recursive;
@@ -52,12 +52,17 @@ class Rm {
       return recursive( true );
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Hdfs.SERVICE_PATH, file );
-      addQueryParam( uri, "op", "DELETE" );
-      addQueryParam( uri, "recursive", recursive );
-      HttpDelete request = new HttpDelete( uri.build() );
-      return new Response( execute( request ) );
+    public Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Hdfs.SERVICE_PATH, file );
+          addQueryParam( uri, "op", "DELETE" );
+          addQueryParam( uri, "recursive", recursive );
+          HttpDelete request = new HttpDelete( uri.build() );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
index 2a8a410..edc8474 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
@@ -28,13 +28,13 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 public class Hive {
 
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
     private String group;
     private String file;
@@ -65,15 +65,20 @@ public class Hive {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Job.SERVICE_PATH, "/hive" );
-      addParam( params, "group", group );
-      addParam( params, "file", file );
-      addParam( params, "statusdir", statusDir );
-      UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
-      HttpPost request = new HttpPost( uri.build() );
-      request.setEntity( form );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Job.SERVICE_PATH, "/hive" );
+          addParam( params, "group", group );
+          addParam( params, "file", file );
+          addParam( params, "statusdir", statusDir );
+          UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
+          HttpPost request = new HttpPost( uri.build() );
+          request.setEntity( form );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
index dc416ed..16b2ac4 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
@@ -32,10 +32,11 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 class Java {
   
-  static class Request extends AbstractRequest {
+  static class Request extends AbstractRequest<Response> {
 
     String jar;
     String app;
@@ -66,17 +67,22 @@ class Java {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Job.SERVICE_PATH, "/mapreduce/jar" );
-      List<NameValuePair> params = new ArrayList<NameValuePair>();
-      params.add( new BasicNameValuePair( "jar", jar ) );
-      params.add( new BasicNameValuePair( "class", app ) );
-      params.add( new BasicNameValuePair( "arg", input ) );
-      params.add( new BasicNameValuePair( "arg", output ) );
-      UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
-      HttpPost request = new HttpPost( uri.build() );
-      request.setEntity( form );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Job.SERVICE_PATH, "/mapreduce/jar" );
+          List<NameValuePair> params = new ArrayList<NameValuePair>();
+          params.add( new BasicNameValuePair( "jar", jar ) );
+          params.add( new BasicNameValuePair( "class", app ) );
+          params.add( new BasicNameValuePair( "arg", input ) );
+          params.add( new BasicNameValuePair( "arg", output ) );
+          UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
+          HttpPost request = new HttpPost( uri.build() );
+          request.setEntity( form );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
index afa277d..d87831f 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 public class Pig {
 
@@ -65,17 +66,22 @@ public class Pig {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Job.SERVICE_PATH, "/pig" );
-      List<NameValuePair> params = new ArrayList<NameValuePair>();
-      addParam( params, "group", group );
-      addParam( params, "file", file );
-      addParam( params, "arg", arg );
-      addParam( params, "statusdir", statusDir );
-      UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
-      HttpPost request = new HttpPost( uri.build() );
-      request.setEntity( form );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Job.SERVICE_PATH, "/pig" );
+          List<NameValuePair> params = new ArrayList<NameValuePair>();
+          addParam( params, "group", group );
+          addParam( params, "file", file );
+          addParam( params, "arg", arg );
+          addParam( params, "statusdir", statusDir );
+          UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
+          HttpPost request = new HttpPost( uri.build() );
+          request.setEntity( form );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
index 874694e..baf2bc3 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
@@ -26,6 +26,7 @@ import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Queue {
 
@@ -35,10 +36,15 @@ class Queue {
       super( hadoop );
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Job.SERVICE_PATH, "/queue" );
-      HttpGet request = new HttpGet( uri.build() );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Job.SERVICE_PATH, "/queue" );
+          HttpGet request = new HttpGet( uri.build() );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
index c3fdddf..5f8cb56 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
@@ -26,6 +26,7 @@ import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Status {
 
@@ -42,10 +43,15 @@ class Status {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Job.SERVICE_PATH, "/queue/", jobId );
-      HttpGet request = new HttpGet( uri.build() );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Job.SERVICE_PATH, "/queue/", jobId );
+          HttpGet request = new HttpGet( uri.build() );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
index 6b9d196..431f7c3 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
@@ -24,8 +24,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Status {
 
@@ -42,10 +41,15 @@ class Status {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Workflow.SERVICE_PATH, "/job/", jobId );
-      HttpGet request = new HttpGet( uri.build() );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Workflow.SERVICE_PATH, "/job/", jobId );
+          HttpGet request = new HttpGet( uri.build() );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
index ee72eae..2406e53 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
@@ -32,6 +32,7 @@ import org.apache.http.entity.StringEntity;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
 
 class Submit {
 
@@ -60,18 +61,23 @@ class Submit {
       return this;
     }
 
-    public Response now() throws IOException, URISyntaxException {
-      URIBuilder uri = uri( Workflow.SERVICE_PATH, "/jobs" );
-      addQueryParam( uri, "action", action );
-      HttpPost request = new HttpPost( uri.build() );
-      HttpEntity entity = null;
-      if( text != null ) {
-        entity = new StringEntity( text, ContentType.create( "application/xml", "UTF-8" ) );
-      } else if( file != null ) {
-        entity = new FileEntity( new File( file ), ContentType.create( "application/xml" ) );
-      }
-      request.setEntity( entity );
-      return new Response( execute( request ) );
+    protected Callable<Response> callable() {
+      return new Callable<Response>() {
+        @Override
+        public Response call() throws Exception {
+          URIBuilder uri = uri( Workflow.SERVICE_PATH, "/jobs" );
+          addQueryParam( uri, "action", action );
+          HttpPost request = new HttpPost( uri.build() );
+          HttpEntity entity = null;
+          if( text != null ) {
+            entity = new StringEntity( text, ContentType.create( "application/xml", "UTF-8" ) );
+          } else if( file != null ) {
+            entity = new FileEntity( new File( file ), ContentType.create( "application/xml" ) );
+          }
+          request.setEntity( entity );
+          return new Response( execute( request ) );
+        }
+      };
     }
 
   }