You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vb...@apache.org on 2014/01/27 17:41:29 UTC

git commit: AMBARI-4283. Pass Through API for API forwarding from the Ambari Server (Falcon/Jobs API).(vbrodetskyi)

Updated Branches:
  refs/heads/trunk b0add1ce4 -> 34bf12a0d


AMBARI-4283. Pass Through API for API forwarding from the Ambari Server (Falcon/Jobs API).(vbrodetskyi)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/34bf12a0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/34bf12a0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/34bf12a0

Branch: refs/heads/trunk
Commit: 34bf12a0d86963ecb2e93b5cf8867b3a271ae72c
Parents: b0add1c
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Mon Jan 27 18:39:09 2014 +0200
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Mon Jan 27 18:39:09 2014 +0200

----------------------------------------------------------------------
 .../server/api/services/ProxyService.java       | 107 ++++++++++
 .../controller/internal/URLStreamProvider.java  |  41 ++--
 .../server/api/services/ProxyServiceTest.java   | 210 +++++++++++++++++++
 3 files changed, 340 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/34bf12a0/ambari-server/src/main/java/org/apache/ambari/server/api/services/ProxyService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/ProxyService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/ProxyService.java
new file mode 100644
index 0000000..ce933a9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/ProxyService.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.services;
+
+import com.google.gson.Gson;
+import org.apache.ambari.server.controller.internal.URLStreamProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import javax.ws.rs.Path;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Map;
+
+@Path("/proxy/")
+public class ProxyService {
+
+  private static final int REPO_URL_CONNECT_TIMEOUT = 3000;
+  private static final int REPO_URL_READ_TIMEOUT = 500;
+  private static final int HTTP_ERROR_RANGE_START = 400;
+
+  private static final String REQUEST_TYPE_GET = "GET";
+  private static final String REQUEST_TYPE_POST = "POST";
+  private static final String REQUEST_TYPE_PUT = "PUT";
+  private static final String REQUEST_TYPE_DELETE = "DELETE";
+  private static final String QUERY_PARAMETER_URL = "url";
+  private static final String ERROR_PROCESSING_URL = "Error occurred during processing URL ";
+
+  private final static Logger LOG = LoggerFactory.getLogger(ProxyService.class);
+
+  @GET
+  public Response processGetRequestForwarding(@Context HttpHeaders headers, @Context UriInfo ui) {
+    return handleRequest(REQUEST_TYPE_GET, ui, null);
+  }
+
+  @POST
+  public Response processPostRequestForwarding(String body, @Context HttpHeaders headers, @Context UriInfo ui) {
+    return handleRequest(REQUEST_TYPE_POST, ui, body);
+  }
+
+  @PUT
+  public Response processPutRequestForwarding(String body, @Context HttpHeaders headers, @Context UriInfo ui) {
+    return handleRequest(REQUEST_TYPE_PUT, ui, body);
+  }
+
+  @DELETE
+  public Response processDeleteRequestForwarding(@Context HttpHeaders headers, @Context UriInfo ui) {
+    return handleRequest(REQUEST_TYPE_DELETE, ui, null);
+  }
+
+  private Response handleRequest(String requestType, UriInfo ui, String body) {
+    URLStreamProvider urlStreamProvider = new URLStreamProvider(REPO_URL_CONNECT_TIMEOUT,
+                                                REPO_URL_READ_TIMEOUT, null, null, null);
+    List<String> urlsToForward = ui.getQueryParameters().get(QUERY_PARAMETER_URL);
+    if (!urlsToForward.isEmpty()) {
+      String url = urlsToForward.get(0);
+      try {
+        HttpURLConnection connection = urlStreamProvider.processURL(url, requestType, body);
+        int responseCode = connection.getResponseCode();
+        if (responseCode >= HTTP_ERROR_RANGE_START) {
+          throw new WebApplicationException(connection.getResponseCode());
+        }
+        String contentType = connection.getContentType();
+        Response.ResponseBuilder rb = Response.status(responseCode);
+        if (contentType.indexOf(APPLICATION_JSON) != -1) {
+          rb.entity(new Gson().fromJson(new InputStreamReader(connection.getInputStream()), Map.class));
+        } else {
+          rb.entity(connection.getInputStream());
+        }
+        return rb.type(contentType).build();
+      } catch (IOException e) {
+        LOG.error(ERROR_PROCESSING_URL + url, e);
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/34bf12a0/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java
index 7cbd999..2847c05 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java
@@ -77,13 +77,17 @@ public class URLStreamProvider implements StreamProvider {
   
   @Override
   public InputStream readFrom(String spec, String requestMethod, String params) throws IOException {
+    return processURL(spec, requestMethod, params).getInputStream();
+  }
+
+  public HttpURLConnection processURL(String spec, String requestMethod, String params) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("readFrom spec:" + spec);
     }
-    
-    HttpURLConnection connection = spec.startsWith("https") ? 
-        (HttpURLConnection)getSSLConnection(spec)
-        : (HttpURLConnection)getConnection(spec);
+
+    HttpURLConnection connection = spec.startsWith("https") ?
+            (HttpURLConnection)getSSLConnection(spec)
+            : (HttpURLConnection)getConnection(spec);
 
     String appCookie = appCookieManager.getCachedAppCookie(spec);
     if (appCookie != null) {
@@ -94,39 +98,40 @@ public class URLStreamProvider implements StreamProvider {
     connection.setReadTimeout(readTimeout);
     connection.setDoOutput(true);
     connection.setRequestMethod(requestMethod);
-    
-    if (params != null)
+
+    if (params != null) {
       connection.getOutputStream().write(params.getBytes());
-    
+    }
+
     int statusCode = connection.getResponseCode();
     if (statusCode == HttpStatus.SC_UNAUTHORIZED ) {
       String wwwAuthHeader = connection.getHeaderField(WWW_AUTHENTICATE);
       if (LOG.isInfoEnabled()) {
         LOG.info("Received WWW-Authentication header:" + wwwAuthHeader + ", for URL:" + spec);
       }
-      if (wwwAuthHeader != null && 
-          wwwAuthHeader.trim().startsWith(NEGOTIATE)) {
+      if (wwwAuthHeader != null &&
+        wwwAuthHeader.trim().startsWith(NEGOTIATE)) {
         //connection.getInputStream().close();
-        connection = spec.startsWith("https") ? 
-            (HttpURLConnection)getSSLConnection(spec)
-            : (HttpURLConnection)getConnection(spec);
+        connection = spec.startsWith("https") ?
+           (HttpURLConnection)getSSLConnection(spec)
+           : (HttpURLConnection)getConnection(spec);
         appCookie = appCookieManager.getAppCookie(spec, true);
         connection.setRequestProperty(COOKIE, appCookie);
         connection.setConnectTimeout(connTimeout);
         connection.setReadTimeout(readTimeout);
         connection.setDoOutput(true);
-        
-        return connection.getInputStream();
+
+        return connection;
       } else {
         // no supported authentication type found
         // we would let the original response propogate
         LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + spec);
-        return connection.getInputStream();
+        return connection;
       }
     } else {
-      // not a 401 Unauthorized status code
-      // we would let the original response propogate
-      return connection.getInputStream();
+        // not a 401 Unauthorized status code
+        // we would let the original response propogate
+        return connection;
     }
   }
   

http://git-wip-us.apache.org/repos/asf/ambari/blob/34bf12a0/ambari-server/src/test/java/org/apache/ambari/server/api/services/ProxyServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/ProxyServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/ProxyServiceTest.java
new file mode 100644
index 0000000..057772b
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/ProxyServiceTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.services;
+
+import com.google.gson.Gson;
+import com.sun.jersey.core.spi.factory.ResponseBuilderImpl;
+import com.sun.jersey.core.spi.factory.ResponseImpl;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+import org.apache.ambari.server.controller.internal.URLStreamProvider;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import static org.junit.Assert.assertSame;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.util.Map;
+import java.util.List;
+import java.util.Collections;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ProxyServiceTest.class, ProxyService.class, URLStreamProvider.class, Response.class, ResponseBuilderImpl.class })
+class ProxyServiceTest extends BaseServiceTest {
+
+  @Test
+  public void testProxyGetRequest() throws Exception {
+    ProxyService ps = new ProxyService();
+    URLStreamProvider streamProviderMock = PowerMock.createNiceMock(URLStreamProvider.class);
+    HttpURLConnection urlConnectionMock = createMock(HttpURLConnection.class);
+    MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+    Response.ResponseBuilder responseBuilderMock = PowerMock.createMock(ResponseBuilderImpl.class);
+    Response responseMock = createMock(ResponseImpl.class);
+    queryParams.add("url","testurl");
+    InputStream is = new ByteArrayInputStream("test".getBytes());
+    PowerMock.mockStatic(Response.class);
+    expect(getUriInfo().getQueryParameters()).andReturn(queryParams);
+    expect(streamProviderMock.processURL("testurl", "GET", null)).andReturn(urlConnectionMock);
+    expect(urlConnectionMock.getResponseCode()).andReturn(200);
+    expect(urlConnectionMock.getContentType()).andReturn("text/plain");
+    expect(urlConnectionMock.getInputStream()).andReturn(is);
+    PowerMock.expectNew(URLStreamProvider.class, 3000, 500, null, null, null).andReturn(streamProviderMock);
+    expect(Response.status(200)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.entity(is)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.type("text/plain")).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.build()).andReturn(responseMock);
+    PowerMock.replay(streamProviderMock, URLStreamProvider.class, Response.class, responseBuilderMock);
+    replay(getUriInfo(), urlConnectionMock);
+    Response resultForGetRequest = ps.processGetRequestForwarding(getHttpHeaders(),getUriInfo());
+    assertSame(resultForGetRequest, responseMock);
+  }
+
+  @Test
+  public void testProxyPostRequest() throws Exception {
+    ProxyService ps = new ProxyService();
+    URLStreamProvider streamProviderMock = PowerMock.createNiceMock(URLStreamProvider.class);
+    HttpURLConnection urlConnectionMock = createMock(HttpURLConnection.class);
+    MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+    Response.ResponseBuilder responseBuilderMock = PowerMock.createMock(ResponseBuilderImpl.class);
+    Response responseMock = createMock(ResponseImpl.class);
+    queryParams.add("url","testurl");
+    InputStream is = new ByteArrayInputStream("test".getBytes());
+    PowerMock.mockStatic(Response.class);
+    expect(getUriInfo().getQueryParameters()).andReturn(queryParams);
+    expect(streamProviderMock.processURL("testurl", "POST", "testbody")).andReturn(urlConnectionMock);
+    expect(urlConnectionMock.getResponseCode()).andReturn(200);
+    expect(urlConnectionMock.getContentType()).andReturn("text/plain");
+    expect(urlConnectionMock.getInputStream()).andReturn(is);
+    PowerMock.expectNew(URLStreamProvider.class, 3000, 500, null, null, null).andReturn(streamProviderMock);
+    expect(Response.status(200)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.entity(is)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.type("text/plain")).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.build()).andReturn(responseMock);
+    PowerMock.replay(streamProviderMock, URLStreamProvider.class, Response.class, responseBuilderMock);
+    replay(getUriInfo(), urlConnectionMock);
+    Response resultForPostRequest = ps.processPostRequestForwarding("testbody", getHttpHeaders(), getUriInfo());
+    assertSame(resultForPostRequest, responseMock);
+  }
+
+  @Test
+  public void testProxyPutRequest() throws Exception {
+    ProxyService ps = new ProxyService();
+    URLStreamProvider streamProviderMock = PowerMock.createNiceMock(URLStreamProvider.class);
+    HttpURLConnection urlConnectionMock = createMock(HttpURLConnection.class);
+    MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+    Response.ResponseBuilder responseBuilderMock = PowerMock.createMock(ResponseBuilderImpl.class);
+    Response responseMock = createMock(ResponseImpl.class);
+    queryParams.add("url","testurl");
+    InputStream is = new ByteArrayInputStream("test".getBytes());
+    PowerMock.mockStatic(Response.class);
+    expect(getUriInfo().getQueryParameters()).andReturn(queryParams);
+    expect(streamProviderMock.processURL("testurl", "PUT", "testbody")).andReturn(urlConnectionMock);
+    expect(urlConnectionMock.getResponseCode()).andReturn(200);
+    expect(urlConnectionMock.getContentType()).andReturn("text/plain");
+    expect(urlConnectionMock.getInputStream()).andReturn(is);
+    PowerMock.expectNew(URLStreamProvider.class, 3000, 500, null, null, null).andReturn(streamProviderMock);
+    expect(Response.status(200)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.entity(is)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.type("text/plain")).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.build()).andReturn(responseMock);
+    PowerMock.replay(streamProviderMock, URLStreamProvider.class, Response.class, responseBuilderMock);
+    replay(getUriInfo(), urlConnectionMock);
+    Response resultForPutRequest = ps.processPutRequestForwarding("testbody", getHttpHeaders(), getUriInfo());
+    assertSame(resultForPutRequest, responseMock);
+  }
+
+  @Test
+  public void testProxyDeleteRequest() throws Exception {
+    ProxyService ps = new ProxyService();
+    URLStreamProvider streamProviderMock = PowerMock.createNiceMock(URLStreamProvider.class);
+    HttpURLConnection urlConnectionMock = createMock(HttpURLConnection.class);
+    MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+    Response.ResponseBuilder responseBuilderMock = PowerMock.createMock(ResponseBuilderImpl.class);
+    Response responseMock = createMock(ResponseImpl.class);
+    queryParams.add("url","testurl");
+    InputStream is = new ByteArrayInputStream("test".getBytes());
+    PowerMock.mockStatic(Response.class);
+    expect(getUriInfo().getQueryParameters()).andReturn(queryParams);
+    expect(streamProviderMock.processURL("testurl", "DELETE", null)).andReturn(urlConnectionMock);
+    expect(urlConnectionMock.getResponseCode()).andReturn(200);
+    expect(urlConnectionMock.getContentType()).andReturn("text/plain");
+    expect(urlConnectionMock.getInputStream()).andReturn(is);
+    PowerMock.expectNew(URLStreamProvider.class, 3000, 500, null, null, null).andReturn(streamProviderMock);
+    expect(Response.status(200)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.entity(is)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.type("text/plain")).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.build()).andReturn(responseMock);
+    PowerMock.replay(streamProviderMock, URLStreamProvider.class, Response.class, responseBuilderMock);
+    replay(getUriInfo(), urlConnectionMock);
+    Response resultForDeleteRequest = ps.processDeleteRequestForwarding(getHttpHeaders(), getUriInfo());
+    assertSame(resultForDeleteRequest, responseMock);
+  }
+
+  @Test(expected = WebApplicationException.class)
+  public void testResponseWithError() throws Exception {
+    ProxyService ps = new ProxyService();
+    URLStreamProvider streamProviderMock = PowerMock.createNiceMock(URLStreamProvider.class);
+    HttpURLConnection urlConnectionMock = createMock(HttpURLConnection.class);
+    MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+    queryParams.add("url","testurl");
+    expect(getUriInfo().getQueryParameters()).andReturn(queryParams);
+    expect(streamProviderMock.processURL("testurl", "GET", null)).andReturn(urlConnectionMock);
+    expect(urlConnectionMock.getResponseCode()).andReturn(405).times(2);
+    PowerMock.expectNew(URLStreamProvider.class, 3000, 500, null, null, null).andReturn(streamProviderMock);
+    PowerMock.replay(streamProviderMock, URLStreamProvider.class);
+    replay(getUriInfo(), urlConnectionMock);
+    ps.processGetRequestForwarding(getHttpHeaders(),getUriInfo());
+  }
+
+  @Test
+  public void testProxyWithJSONResponse() throws Exception {
+    ProxyService ps = new ProxyService();
+    URLStreamProvider streamProviderMock = PowerMock.createNiceMock(URLStreamProvider.class);
+    HttpURLConnection urlConnectionMock = createMock(HttpURLConnection.class);
+    MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+    Response.ResponseBuilder responseBuilderMock = PowerMock.createMock(ResponseBuilderImpl.class);
+    Response responseMock = createMock(ResponseImpl.class);
+    queryParams.add("url","testurl");
+    Map map = new Gson().fromJson(new InputStreamReader(new ByteArrayInputStream("{ \"test\":\"test\" }".getBytes())), Map.class);
+    PowerMock.mockStatic(Response.class);
+    expect(getUriInfo().getQueryParameters()).andReturn(queryParams);
+    expect(streamProviderMock.processURL("testurl", "GET", null)).andReturn(urlConnectionMock);
+    expect(urlConnectionMock.getResponseCode()).andReturn(200);
+    expect(urlConnectionMock.getContentType()).andReturn("application/json");
+    expect(urlConnectionMock.getInputStream()).andReturn(new ByteArrayInputStream("{ \"test\":\"test\" }".getBytes()));
+    PowerMock.expectNew(URLStreamProvider.class, 3000, 500, null, null, null).andReturn(streamProviderMock);
+    expect(Response.status(200)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.entity(map)).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.type("application/json")).andReturn(responseBuilderMock);
+    expect(responseBuilderMock.build()).andReturn(responseMock);
+    PowerMock.replay(streamProviderMock, URLStreamProvider.class, Response.class, responseBuilderMock);
+    replay(getUriInfo(), urlConnectionMock);
+    Response resultForGetRequest = ps.processGetRequestForwarding(getHttpHeaders(),getUriInfo());
+    assertSame(resultForGetRequest, responseMock);
+  }
+
+  @Override
+  public List<ServiceTestInvocation> getTestInvocations() throws Exception {
+    return Collections.emptyList();
+  }
+
+}