You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2013/03/15 20:45:05 UTC
git commit: Added async support to client.
Updated Branches:
refs/heads/master fe50e9b2f -> 511fae36d
Added async support to client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-knox/commit/511fae36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-knox/tree/511fae36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-knox/diff/511fae36
Branch: refs/heads/master
Commit: 511fae36d5779062b26aaa0b497df17813e6a56f
Parents: fe50e9b
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Fri Mar 15 15:45:00 2013 -0400
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Fri Mar 15 15:45:00 2013 -0400
----------------------------------------------------------------------
.../hadoop/gateway/shell/AbstractRequest.java | 16 +++-
.../hadoop/gateway/shell/AbstractResponse.java | 4 +
.../org/apache/hadoop/gateway/shell/Hadoop.java | 50 ++---------
.../hadoop/gateway/shell/hdfs/Example.groovy | 3 +
.../org/apache/hadoop/gateway/shell/hdfs/Get.java | 29 ++++---
.../org/apache/hadoop/gateway/shell/hdfs/Ls.java | 21 +++--
.../apache/hadoop/gateway/shell/hdfs/Mkdir.java | 20 +++--
.../org/apache/hadoop/gateway/shell/hdfs/Put.java | 65 ++++++++-------
.../org/apache/hadoop/gateway/shell/hdfs/Rm.java | 21 +++--
.../org/apache/hadoop/gateway/shell/job/Hive.java | 27 ++++---
.../org/apache/hadoop/gateway/shell/job/Java.java | 30 ++++---
.../org/apache/hadoop/gateway/shell/job/Pig.java | 28 ++++---
.../org/apache/hadoop/gateway/shell/job/Queue.java | 14 ++-
.../apache/hadoop/gateway/shell/job/Status.java | 14 ++-
.../hadoop/gateway/shell/workflow/Status.java | 16 ++--
.../hadoop/gateway/shell/workflow/Submit.java | 30 ++++---
16 files changed, 219 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
index f850e39..1e5eeee 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractRequest.java
@@ -10,6 +10,8 @@ import org.apache.http.message.BasicNameValuePair;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
/**
* Licensed to the Apache Software Foundation (ASF) under one
@@ -28,7 +30,7 @@ import java.util.List;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-public abstract class AbstractRequest {
+public abstract class AbstractRequest<T> {
private Hadoop hadoop;
@@ -41,7 +43,7 @@ public abstract class AbstractRequest {
}
protected HttpResponse execute( HttpRequest request ) throws IOException {
- return hadoop.execute( request );
+ return hadoop.executeNow( request );
}
protected URIBuilder uri( String... parts ) throws URISyntaxException {
@@ -60,4 +62,14 @@ public abstract class AbstractRequest {
}
}
+ abstract protected Callable<T> callable();
+
+ public T now() throws Exception, URISyntaxException {
+ return callable().call();
+ }
+
+ public Future<T> later() {
+ return hadoop().executeLater( callable() );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
index 1028fff..4dde1b2 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/AbstractResponse.java
@@ -55,6 +55,10 @@ public abstract class AbstractResponse {
return consumed;
}
+ public int getStatusCode() {
+ return response.getStatusLine().getStatusCode();
+ }
+
public long getContentLength() {
return response.getEntity().getContentLength();
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
index 47baf8c..4fcd3c9 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/Hadoop.java
@@ -39,6 +39,10 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class Hadoop {
@@ -48,12 +52,14 @@ public class Hadoop {
BasicHttpContext context;
String username;
String password;
+ ExecutorService executor;
public static Hadoop login( String url, String username, String password ) throws URISyntaxException {
return new Hadoop( url, username, password );
}
private Hadoop( String url, String username, String password ) throws HadoopException, URISyntaxException {
+ this.executor = Executors.newCachedThreadPool();
this.base = url;
this.username = username;
this.password = password;
@@ -90,48 +96,12 @@ public class Hadoop {
return base;
}
- public HttpResponse execute( HttpRequest request ) throws IOException {
+ public HttpResponse executeNow( HttpRequest request ) throws IOException {
return client.execute( host, request, context );
}
-// SSLContext ctx = SSLContext.getInstance( "TLS" );
-// KeyManager[] keyManagers = createKeyManagers( "jks", "target/test-classes/client-keystore.jks", "horton" );
-// TrustManager[] trustManagers = createTrustManagers( "jks", "target/test-classes/client-truststore.jks", "horton" );
-// ctx.init( keyManagers, trustManagers, new SecureRandom() );
-//
-// SSLSocketFactory socketFactory = new SSLSocketFactory( ctx, SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER );
-//
-// SchemeRegistry schemes = new SchemeRegistry();
-// schemes.register( new Scheme( "https", port, socketFactory ) );
-// ClientConnectionManager cm = new BasicClientConnectionManager( schemes );
-//
-// HttpClient client = new DefaultHttpClient( cm );
-//
-// HttpGet get = new HttpGet( url );
-// ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-// client.execute( get ).getEntity().writeTo( buffer );
-// assertThat( buffer.toString(), equalTo( "<html>Hello!</html>" ) );
-//}
-//
-// private static KeyManager[] createKeyManagers( String keyStoreType, String keyStorePath, String keyStorePassword ) throws Exception {
-// KeyStore keyStore = loadKeyStore( keyStoreType, keyStorePath, keyStorePassword );
-// KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
-// kmf.init( keyStore, keyStorePassword.toCharArray() );
-// return kmf.getKeyManagers();
-// }
-//
-// private static TrustManager[] createTrustManagers( String trustStoreType, String trustStorePath, String trustStorePassword ) throws Exception {
-// KeyStore trustStore = loadKeyStore( trustStoreType, trustStorePath, trustStorePassword );
-// TrustManagerFactory tmf = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm() );
-// tmf.init( trustStore );
-// return tmf.getTrustManagers();
-// }
-//
-// private static KeyStore loadKeyStore( String type, String path, String password ) throws IOException, NoSuchAlgorithmException, CertificateException, KeyStoreException {
-// KeyStore keyStore = KeyStore.getInstance( type );
-// InputStream keystoreInput = new FileInputStream( path );
-// keyStore.load( keystoreInput, password.toCharArray() );
-// return keyStore;
-// }
+ public <T> Future<T> executeLater( Callable<T> callable ) {
+ return executor.submit( callable );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
index f6ec4c4..4d39b93 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Example.groovy
@@ -38,3 +38,6 @@ hdfs.mkdir(hadoop).dir( "/tmp/test").now()
hdfs.put(hadoop).file( inputFile ).to( "/tmp/test/input/LICENSE" ).now()
hdfs.get(hadoop).file( "/Users/kevin.minder/Projects/gateway-0.2.0-SNAPSHOT/OUTPUT" ).from( "/tmp/test/input/LICENSE" ).now()
+
+future = hdfs.put(hadoop).file( inputFile ).to( "/tmp/test/input/LICENSE2" ).later()
+println future.get().statusCode
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
index 1413ebd..e9d3c41 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Get.java
@@ -21,25 +21,20 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.gateway.shell.AbstractRequest;
import org.apache.hadoop.gateway.shell.AbstractResponse;
import org.apache.hadoop.gateway.shell.Hadoop;
-import org.apache.hadoop.gateway.shell.HadoopException;
-import org.apache.http.Header;
import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.util.EntityUtils;
import java.io.File;
import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
public class Get {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
- String from;
- String to;
+ private String from;
+ private String to;
Request( Hadoop hadoop ) {
super( hadoop );
@@ -55,11 +50,17 @@ public class Get {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Hdfs.SERVICE_PATH, from );
- addQueryParam( uri, "op", "OPEN" );
- HttpGet request = new HttpGet( uri.build() );
- return new Response( execute( request ), to );
+
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Hdfs.SERVICE_PATH, from );
+ addQueryParam( uri, "op", "OPEN" );
+ HttpGet request = new HttpGet( uri.build() );
+ return new Response( execute( request ), to );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
index 25688ac..9e1ce27 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Ls.java
@@ -23,14 +23,12 @@ import org.apache.hadoop.gateway.shell.Hadoop;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.util.EntityUtils;
-import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Ls {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
String dir;
@@ -43,11 +41,16 @@ class Ls {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
- addQueryParam( uri, "op", "LISTSTATUS" );
- HttpGet get = new HttpGet( uri.build() );
- return new Response( execute( get ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
+ addQueryParam( uri, "op", "LISTSTATUS" );
+ HttpGet get = new HttpGet( uri.build() );
+ return new Response( execute( get ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
index 2212d9b..0f7f1ba 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Mkdir.java
@@ -26,10 +26,11 @@ import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Mkdir {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
String dir = null;
String perm = null;
@@ -48,12 +49,17 @@ class Mkdir {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
- addQueryParam( uri, "op", "MKDIRS" );
- addQueryParam( uri, "permissions", perm );
- HttpPut request = new HttpPut( uri.build() );
- return new Response( execute( request ) );
+ public Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Hdfs.SERVICE_PATH, dir );
+ addQueryParam( uri, "op", "MKDIRS" );
+ addQueryParam( uri, "permissions", perm );
+ HttpPut request = new HttpPut( uri.build() );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
index 08fc877..9510a64 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Put.java
@@ -32,16 +32,18 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import java.io.File;
-import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
class Put {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
- String text;
- String from;
- String to;
+ private String text;
+ private String file;
+ private String to;
Request( Hadoop hadoop ) {
super( hadoop );
@@ -53,7 +55,7 @@ class Put {
}
public Request file( String file ) {
- this.from = file;
+ this.file = file;
return this;
}
@@ -62,29 +64,34 @@ class Put {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Hdfs.SERVICE_PATH, to );
- addQueryParam( uri, "op", "CREATE" );
- HttpPut nn = new HttpPut( uri.build() );
- HttpResponse r = execute( nn );
- if( r.getStatusLine().getStatusCode() != HttpStatus.SC_TEMPORARY_REDIRECT ) {
- throw new HadoopException( r.getStatusLine().toString() );
- }
- EntityUtils.consumeQuietly( r.getEntity() );
- Header[] h = r.getHeaders( "Location" );
- if( h == null || h.length != 1 ) {
- throw new HadoopException( "Invalid Location header." );
- }
- String loc = h[0].getValue();
- HttpPut dn = new HttpPut( loc );
- HttpEntity e = null;
- if( text != null ) {
- e = new StringEntity( text );
- } else if( from != null ) {
- e = new FileEntity( new File( from ) );
- }
- dn.setEntity( e );
- return new Response( execute( dn ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Hdfs.SERVICE_PATH, to );
+ addQueryParam( uri, "op", "CREATE" );
+ HttpPut nn = new HttpPut( uri.build() );
+ HttpResponse r = execute( nn );
+ if( r.getStatusLine().getStatusCode() != HttpStatus.SC_TEMPORARY_REDIRECT ) {
+ throw new HadoopException( r.getStatusLine().toString() );
+ }
+ EntityUtils.consumeQuietly( r.getEntity() );
+ Header[] h = r.getHeaders( "Location" );
+ if( h == null || h.length != 1 ) {
+ throw new HadoopException( "Invalid Location header." );
+ }
+ String loc = h[0].getValue();
+ HttpPut dn = new HttpPut( loc );
+ HttpEntity e = null;
+ if( text != null ) {
+ e = new StringEntity( text );
+ } else if( file != null ) {
+ e = new FileEntity( new File( file ) );
+ }
+ dn.setEntity( e );
+ return new Response( execute( dn ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
index fb2b2f8..955eb9f 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/hdfs/Rm.java
@@ -25,11 +25,11 @@ import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Rm {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
String file;
Boolean recursive;
@@ -52,12 +52,17 @@ class Rm {
return recursive( true );
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Hdfs.SERVICE_PATH, file );
- addQueryParam( uri, "op", "DELETE" );
- addQueryParam( uri, "recursive", recursive );
- HttpDelete request = new HttpDelete( uri.build() );
- return new Response( execute( request ) );
+ public Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Hdfs.SERVICE_PATH, file );
+ addQueryParam( uri, "op", "DELETE" );
+ addQueryParam( uri, "recursive", recursive );
+ HttpDelete request = new HttpDelete( uri.build() );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
index 2a8a410..edc8474 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Hive.java
@@ -28,13 +28,13 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
public class Hive {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
private String group;
private String file;
@@ -65,15 +65,20 @@ public class Hive {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Job.SERVICE_PATH, "/hive" );
- addParam( params, "group", group );
- addParam( params, "file", file );
- addParam( params, "statusdir", statusDir );
- UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
- HttpPost request = new HttpPost( uri.build() );
- request.setEntity( form );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Job.SERVICE_PATH, "/hive" );
+ addParam( params, "group", group );
+ addParam( params, "file", file );
+ addParam( params, "statusdir", statusDir );
+ UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
+ HttpPost request = new HttpPost( uri.build() );
+ request.setEntity( form );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
index dc416ed..16b2ac4 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Java.java
@@ -32,10 +32,11 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
class Java {
- static class Request extends AbstractRequest {
+ static class Request extends AbstractRequest<Response> {
String jar;
String app;
@@ -66,17 +67,22 @@ class Java {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Job.SERVICE_PATH, "/mapreduce/jar" );
- List<NameValuePair> params = new ArrayList<NameValuePair>();
- params.add( new BasicNameValuePair( "jar", jar ) );
- params.add( new BasicNameValuePair( "class", app ) );
- params.add( new BasicNameValuePair( "arg", input ) );
- params.add( new BasicNameValuePair( "arg", output ) );
- UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
- HttpPost request = new HttpPost( uri.build() );
- request.setEntity( form );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Job.SERVICE_PATH, "/mapreduce/jar" );
+ List<NameValuePair> params = new ArrayList<NameValuePair>();
+ params.add( new BasicNameValuePair( "jar", jar ) );
+ params.add( new BasicNameValuePair( "class", app ) );
+ params.add( new BasicNameValuePair( "arg", input ) );
+ params.add( new BasicNameValuePair( "arg", output ) );
+ UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
+ HttpPost request = new HttpPost( uri.build() );
+ request.setEntity( form );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
index afa277d..d87831f 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Pig.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
public class Pig {
@@ -65,17 +66,22 @@ public class Pig {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Job.SERVICE_PATH, "/pig" );
- List<NameValuePair> params = new ArrayList<NameValuePair>();
- addParam( params, "group", group );
- addParam( params, "file", file );
- addParam( params, "arg", arg );
- addParam( params, "statusdir", statusDir );
- UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
- HttpPost request = new HttpPost( uri.build() );
- request.setEntity( form );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Job.SERVICE_PATH, "/pig" );
+ List<NameValuePair> params = new ArrayList<NameValuePair>();
+ addParam( params, "group", group );
+ addParam( params, "file", file );
+ addParam( params, "arg", arg );
+ addParam( params, "statusdir", statusDir );
+ UrlEncodedFormEntity form = new UrlEncodedFormEntity( params );
+ HttpPost request = new HttpPost( uri.build() );
+ request.setEntity( form );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
index 874694e..baf2bc3 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Queue.java
@@ -26,6 +26,7 @@ import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Queue {
@@ -35,10 +36,15 @@ class Queue {
super( hadoop );
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Job.SERVICE_PATH, "/queue" );
- HttpGet request = new HttpGet( uri.build() );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Job.SERVICE_PATH, "/queue" );
+ HttpGet request = new HttpGet( uri.build() );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
index c3fdddf..5f8cb56 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/job/Status.java
@@ -26,6 +26,7 @@ import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Status {
@@ -42,10 +43,15 @@ class Status {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Job.SERVICE_PATH, "/queue/", jobId );
- HttpGet request = new HttpGet( uri.build() );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Job.SERVICE_PATH, "/queue/", jobId );
+ HttpGet request = new HttpGet( uri.build() );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
index 6b9d196..431f7c3 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Status.java
@@ -24,8 +24,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
-import java.io.IOException;
-import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Status {
@@ -42,10 +41,15 @@ class Status {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Workflow.SERVICE_PATH, "/job/", jobId );
- HttpGet request = new HttpGet( uri.build() );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Workflow.SERVICE_PATH, "/job/", jobId );
+ HttpGet request = new HttpGet( uri.build() );
+ return new Response( execute( request ) );
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/511fae36/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
----------------------------------------------------------------------
diff --git a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
index ee72eae..2406e53 100644
--- a/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
+++ b/gateway-shell/src/main/java/org/apache/hadoop/gateway/shell/workflow/Submit.java
@@ -32,6 +32,7 @@ import org.apache.http.entity.StringEntity;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
class Submit {
@@ -60,18 +61,23 @@ class Submit {
return this;
}
- public Response now() throws IOException, URISyntaxException {
- URIBuilder uri = uri( Workflow.SERVICE_PATH, "/jobs" );
- addQueryParam( uri, "action", action );
- HttpPost request = new HttpPost( uri.build() );
- HttpEntity entity = null;
- if( text != null ) {
- entity = new StringEntity( text, ContentType.create( "application/xml", "UTF-8" ) );
- } else if( file != null ) {
- entity = new FileEntity( new File( file ), ContentType.create( "application/xml" ) );
- }
- request.setEntity( entity );
- return new Response( execute( request ) );
+ protected Callable<Response> callable() {
+ return new Callable<Response>() {
+ @Override
+ public Response call() throws Exception {
+ URIBuilder uri = uri( Workflow.SERVICE_PATH, "/jobs" );
+ addQueryParam( uri, "action", action );
+ HttpPost request = new HttpPost( uri.build() );
+ HttpEntity entity = null;
+ if( text != null ) {
+ entity = new StringEntity( text, ContentType.create( "application/xml", "UTF-8" ) );
+ } else if( file != null ) {
+ entity = new FileEntity( new File( file ), ContentType.create( "application/xml" ) );
+ }
+ request.setEntity( entity );
+ return new Response( execute( request ) );
+ }
+ };
}
}