You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by GitBox <gi...@apache.org> on 2019/07/10 20:41:07 UTC

[GitHub] [metron] mmiklavc commented on a change in pull request #1440: METRON-2148: Stellar REST POST function

mmiklavc commented on a change in pull request #1440: METRON-2148: Stellar REST POST function
URL: https://github.com/apache/metron/pull/1440#discussion_r302261582
 
 

 ##########
 File path: metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
 ##########
 @@ -168,241 +195,399 @@ public Object apply(List<Object> args, Context context) throws ParseException {
 
     @Override
     public void close() throws IOException {
-      if (httpClient != null) {
-        httpClient.close();
-      }
-      if (scheduledExecutorService != null) {
-        scheduledExecutorService.shutdown();
-      }
+      closeHttpClient();
+      closeExecutorService();
     }
 
-    /**
-     * Retrieves the ClosableHttpClient from a pooling connection manager.
-     *
-     * @param context The execution context.
-     * @return A ClosableHttpClient.
-     */
-    protected CloseableHttpClient getHttpClient(Context context) {
-      RestConfig restConfig = getRestConfig(Collections.emptyList(), getGlobalConfig(context));
-
-      PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
+    private HttpGet buildGetRequest(String uri, Map<String, Object> queryParameters) throws URISyntaxException {
+      HttpGet httpGet = new HttpGet(getURI(uri, queryParameters));
+      httpGet.addHeader("Accept", "application/json");
 
-      return HttpClients.custom()
-              .setConnectionManager(cm)
-              .build();
+      return httpGet;
     }
+  }
 
-    protected PoolingHttpClientConnectionManager getConnectionManager(RestConfig restConfig) {
-      PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
-      if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
-        cm.setMaxTotal(restConfig.getPoolingMaxTotal());
-      }
-      if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
-        cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
-      }
-      return cm;
-    }
+  @Stellar(
+          namespace = "REST",
+          name = "POST",
+          description = "Performs a REST POST request and parses the JSON results into a map.",
+          params = {
+                  "url - URI to the REST service",
+                  "post_data - POST data that will be sent in the POST request.  Must be well-formed JSON unless the 'enforce.json' property is set to false.",
+                  "rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
+                          "of the same name. Default is the empty Map, meaning no overrides.",
+                  "query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters"
+
+          },
+          returns = "JSON results as a Map")
+  public static class RestPost implements StellarFunction {
 
     /**
-     * Only used for testing.
-     * @param httpClient
+     * Whether the function has been initialized.
      */
-    protected void setHttpClient(CloseableHttpClient httpClient) {
-      this.httpClient = httpClient;
-    }
+    private boolean initialized = false;
 
     /**
-     * Perform the HttpClient get and handle the results.  A configurable list of status codes are accepted and the
-     * response content (expected to be json) is parsed into a Map.  Values returned on errors and when response content
-     * is also configurable.  The rest config "timeout" setting is imposed in this method and will abort the get request
-     * if exceeded.
-     *
-     * @param restConfig
-     * @param httpGet
-     * @param httpClientContext
-     * @return
-     * @throws IOException
+     * Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
+     * Stellar context.
+     * @param context
      */
-    protected Object doGet(RestConfig restConfig, HttpGet httpGet, HttpClientContext httpClientContext) throws IOException {
+    @Override
+    public void initialize(Context context) {
+      initializeExecutorService();
+      initializeHttpClient(context);
+      initialized = true;
+    }
 
-      // Schedule a command to abort the httpGet request if the timeout is exceeded
-      ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
-      CloseableHttpResponse response;
-      try {
-        response = httpClient.execute(httpGet, httpClientContext);
-      } catch(Exception e) {
-        // Report a timeout if the httpGet request was aborted.  Otherwise rethrow exception.
-        if (httpGet.isAborted()) {
-          throw new IOException(String.format("Total Stellar REST request time to %s exceeded the configured timeout of %d ms.", httpGet.getURI().toString(), restConfig.getTimeout()));
-        } else {
-          throw e;
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      String uriString = getArg(0, String.class, args);
+      Object dataObject = getArg(1, Object.class, args);
+      Map<String, Object> functionRestConfig = null;
+      Map<String, Object> queryParameters = new HashMap<>();
+      if (args.size() > 2) {
+        functionRestConfig = getArg(2, Map.class, args);
+        if (args.size() == 4) {
+          queryParameters = getArg(3, Map.class, args);
         }
       }
 
-      // Cancel the future if the request finished within the timeout
-      if (!scheduledFuture.isDone()) {
-        scheduledFuture.cancel(true);
-      }
-      int statusCode = response.getStatusLine().getStatusCode();
-      LOG.debug("request = {}; response = {}", httpGet, response);
-      if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
-        HttpEntity httpEntity = response.getEntity();
-
-        // Parse the response if present, return the empty value override if not
-        Optional<Object> parsedResponse = parseResponse(restConfig, httpGet, httpEntity);
-        return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
-      } else {
-        throw new IOException(String.format("Stellar REST request to %s expected status code to be one of %s but " +
-                "failed with http status code %d: %s",
-                httpGet.getURI().toString(),
-                restConfig.getResponseCodesAllowed().toString(),
-                statusCode,
-                EntityUtils.toString(response.getEntity())));
+      // Build the RestConfig by applying settins in order of precedence
+      Map<String, Object> globalRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
+      Map<String, Object> postRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_POST_SETTINGS);
+      RestConfig restConfig = buildRestConfig(globalRestConfig, postRestConfig, functionRestConfig);
+
+      try {
+        HttpPost httpPost = buildPostRequest(restConfig, uriString, dataObject, queryParameters);
+        return executeRequest(restConfig, httpPost);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage(), e);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return restConfig.getErrorValueOverride();
       }
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getGlobalConfig(Context context) {
-      Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false);
-      return globalCapability.map(o -> (Map<String, Object>) o).orElseGet(HashMap::new);
+    @Override
+    public void close() throws IOException {
+      closeHttpClient();
+      closeExecutorService();
     }
 
-    /**
-     * Build the RestConfig object using the following order of precedence:
-     * <ul>
-     *   <li>rest config supplied as an expression parameter</li>
-     *   <li>rest config stored in the global config</li>
-     *   <li>default rest config</li>
-     * </ul>
-     * Only settings specified in the rest config will override lower priority config settings.
-     * @param args
-     * @param globalConfig
-     * @return
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    protected RestConfig getRestConfig(List<Object> args, Map<String, Object> globalConfig) {
-      Map<String, Object> globalRestConfig = (Map<String, Object>) globalConfig.get(STELLAR_REST_SETTINGS);
-      Map<String, Object> functionRestConfig = null;
-      if (args.size() > 1) {
-        functionRestConfig = getArg(1, Map.class, args);
-      }
+    private HttpPost buildPostRequest(RestConfig restConfig, String uriString, Object dataObject, Map<String, Object> queryParameters) throws JsonProcessingException, URISyntaxException, UnsupportedEncodingException {
+      String body = getPostData(restConfig, dataObject);
 
-      // Add settings in order of precedence
-      RestConfig restConfig = new RestConfig();
-      if (globalRestConfig != null) {
-        restConfig.putAll(globalRestConfig);
-      }
-      if (functionRestConfig != null) {
-        restConfig.putAll(functionRestConfig);
-      }
-      return restConfig;
-    }
+      URI uri = getURI(uriString, queryParameters);
+      HttpPost httpPost = new HttpPost(uri);
+      httpPost.setEntity(new StringEntity(body));
+      httpPost.addHeader("Accept", "application/json");
+      httpPost.addHeader("Content-type", "application/json");
 
-    /**
-     * Returns the proxy HttpHost object if the proxy rest config settings are set.
-     * @param restConfig
-     * @return
-     */
-    protected Optional<HttpHost> getProxy(RestConfig restConfig) {
-      Optional<HttpHost> proxy = Optional.empty();
-      if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != null) {
-        proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), restConfig.getProxyPort(), "http"));
-      }
-      return proxy;
+      return httpPost;
     }
 
     /**
-     * Builds the RequestConfig object by setting HttpClient settings defined in the rest config.
-     * @param restConfig
-     * @param proxy
-     * @return
+     * Serializes the supplied POST data to be sent in the POST request.  Checks for well-formed JSON by default unless 'enforce.json' is set to false.
+     * @param restConfig RestConfig
+     * @param arg POST data
+     * @return Serialized POST data
+     * @throws JsonProcessingException
      */
-    protected RequestConfig getRequestConfig(RestConfig restConfig, Optional<HttpHost> proxy) {
-      RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
-      if (restConfig.getConnectTimeout() != null) {
-        requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
-      }
-      if (restConfig.getConnectionRequestTimeout() != null) {
-        requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+    private String getPostData(RestConfig restConfig, Object arg) throws JsonProcessingException {
+      String data = "";
+      if (arg == null) {
+        return data;
       }
-      if (restConfig.getSocketTimeout() != null) {
-        requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
+      if (arg instanceof Map) {
+        data = JSONUtils.INSTANCE.toJSON(arg, false);
+      } else {
+        data = arg.toString();
+        if (restConfig.enforceJson()) {
+          try {
+            JSONUtils.INSTANCE.toJSONObject(data);
+          } catch (org.json.simple.parser.ParseException e) {
+            throw new IllegalArgumentException(String.format("POST data '%s' must be properly formatted JSON.  " +
+                    "Set the '%s' property to false to disable this check.", data, RestConfig.ENFORCE_JSON));
+          }
+        }
       }
+      return data;
+    }
+  }
 
-      proxy.ifPresent(requestConfigBuilder::setProxy);
-      return requestConfigBuilder.build();
+  /**
+   * Get an argument from a list of arguments.
+   *
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("Expected at least %d argument(s), found %d", index+1, args.size()));
     }
 
-    /**
-     * Builds the HttpClientContext object by setting the basic auth and/or proxy basic auth credentials when the
-     * necessary rest config settings are configured.  Passwords are stored in HDFS.
-     * @param restConfig
-     * @param target
-     * @param proxy
-     * @return
-     * @throws IOException
-     */
-    protected HttpClientContext getHttpClientContext(RestConfig restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
-      HttpClientContext httpClientContext = HttpClientContext.create();
-      boolean credentialsAdded = false;
-      CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-
-      // Add the basic auth credentials if the rest config settings are present
-      if (restConfig.getBasicAuthUser() != null && restConfig.getBasicAuthPasswordPath() != null) {
-        String password = new String(readBytes(new Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
-        credentialsProvider.setCredentials(
-                new AuthScope(target),
-                new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), password));
-        credentialsAdded = true;
-      }
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  /**
+   * Retrieves the ClosableHttpClient from a pooling connection manager.
+   *
+   * @param context The execution context.
+   * @return A ClosableHttpClient.
+   */
+  protected static CloseableHttpClient getHttpClient(Context context) {
+    RestConfig restConfig = buildRestConfig(getGlobalConfig(context));
+
+    PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
+
+    return HttpClients.custom()
+            .setConnectionManager(cm)
+            .build();
+  }
+
+  protected static PoolingHttpClientConnectionManager getConnectionManager(RestConfig restConfig) {
+    PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+    if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
+      cm.setMaxTotal(restConfig.getPoolingMaxTotal());
+    }
+    if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
+      cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
+    }
+    return cm;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Map<String, Object> getGlobalConfig(Context context) {
+    Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false);
+    return globalCapability.map(o -> (Map<String, Object>) o).orElseGet(HashMap::new);
+  }
 
-      // Add the proxy basic auth credentials if the rest config settings are present
-      if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
-              restConfig.getProxyBasicAuthPasswordPath() != null) {
-        String password = new String(readBytes(new Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
-        credentialsProvider.setCredentials(
-                new AuthScope(proxy.get()),
-                new UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
-        credentialsAdded = true;
+  /**
+   * Build the RestConfig by applying settings in order of precedence (last item in the input list has highest priority).
+   * Only settings specified in the rest config will override lower priority config settings.
+   * @param configs
+   * @return
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  protected static RestConfig buildRestConfig(Map<String, Object>... configs) {
+    RestConfig restConfig = new RestConfig();
+
+    // Add settings in order of precedence
+    for(Map<String, Object> config: configs) {
+      if (config != null) {
+        restConfig.putAll(config);
       }
-      if (credentialsAdded) {
-        httpClientContext.setCredentialsProvider(credentialsProvider);
+    }
+    return restConfig;
+  }
+
+  /**
+   * Builds a URI from the supplied URI string and adds query parameters.
+   * @param uriString
+   * @param queryParameters
+   * @return
+   * @throws URISyntaxException
+   */
+  private static URI getURI(String uriString, Map<String, Object> queryParameters) throws URISyntaxException {
+    URIBuilder uriBuilder = new URIBuilder(uriString);
+    if (queryParameters != null) {
+      for(Map.Entry<String, Object> entry: queryParameters.entrySet()) {
+        uriBuilder.setParameter(entry.getKey(), (String) entry.getValue());
       }
-      return httpClientContext;
     }
+    return uriBuilder.build();
+  }
+
+  /**
+   * Returns the proxy HttpHost object if the proxy rest config settings are set.
+   * @param restConfig
+   * @return
+   */
+  protected static Optional<HttpHost> getProxy(RestConfig restConfig) {
+    Optional<HttpHost> proxy = Optional.empty();
+    if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != null) {
+      proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), restConfig.getProxyPort(), "http"));
+    }
+    return proxy;
+  }
 
-    protected Optional<Object> parseResponse(RestConfig restConfig, HttpGet httpGet, HttpEntity httpEntity) throws IOException {
-      Optional<Object> parsedResponse = Optional.empty();
-      if (httpEntity != null) {
-        int actualContentLength = 0;
-        String json = EntityUtils.toString(httpEntity);
-        if (json != null && !json.isEmpty()) {
-          actualContentLength = json.length();
-          parsedResponse = Optional.of(JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER));
-        }
-        if (restConfig.verifyContentLength() && actualContentLength != httpEntity.getContentLength()) {
-          throw new IOException(String.format("Stellar REST request to %s returned incorrect or missing content length. " +
-                          "Content length in the response was %d but the actual body content length was %d.",
-                  httpGet.getURI().toString(),
-                  httpEntity.getContentLength(),
-                  actualContentLength));
-        }
+  /**
+   * Builds the RequestConfig object by setting HttpClient settings defined in the rest config.
+   * @param restConfig
+   * @param proxy
+   * @return
+   */
+  protected static RequestConfig getRequestConfig(RestConfig restConfig, Optional<HttpHost> proxy) {
+    RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+    if (restConfig.getConnectTimeout() != null) {
+      requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
+    }
+    if (restConfig.getConnectionRequestTimeout() != null) {
+      requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+    }
+    if (restConfig.getSocketTimeout() != null) {
+      requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
+    }
+
+    proxy.ifPresent(requestConfigBuilder::setProxy);
+    return requestConfigBuilder.build();
+  }
+
+  /**
+   * Builds the HttpClientContext object by setting the basic auth and/or proxy basic auth credentials when the
+   * necessary rest config settings are configured.  Passwords are stored in HDFS.
+   * @param restConfig
+   * @param target
+   * @param proxy
+   * @return
+   * @throws IOException
+   */
+  protected static HttpClientContext getHttpClientContext(RestConfig restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
+    HttpClientContext httpClientContext = HttpClientContext.create();
+    boolean credentialsAdded = false;
+    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+
+    // Add the basic auth credentials if the rest config settings are present
+    if (restConfig.getBasicAuthUser() != null && restConfig.getBasicAuthPasswordPath() != null) {
+      String password = new String(readBytes(new Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+      credentialsProvider.setCredentials(
+              new AuthScope(target),
+              new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), password));
+      credentialsAdded = true;
+    }
+
+    // Add the proxy basic auth credentials if the rest config settings are present
+    if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
+            restConfig.getProxyBasicAuthPasswordPath() != null) {
+      String password = new String(readBytes(new Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+      credentialsProvider.setCredentials(
+              new AuthScope(proxy.get()),
+              new UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
+      credentialsAdded = true;
+    }
+    if (credentialsAdded) {
+      httpClientContext.setCredentialsProvider(credentialsProvider);
+    }
+    return httpClientContext;
+  }
+
+  /**
+   * Read bytes from a HDFS path.
+   * @param inPath
+   * @return
+   * @throws IOException
+   */
+  private static byte[] readBytes(Path inPath) throws IOException {
+    FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
+    try (FSDataInputStream inputStream = fs.open(inPath)) {
+      return IOUtils.toByteArray(inputStream);
+    }
+  }
+
+  /**
+   * Perform the HttpClient request and handle the results.  A configurable list of status codes are accepted and the
+   * response content (expected to be json) is parsed into a Map.  Values returned on errors and when response content
+   * is also configurable.  The rest config "timeout" setting is imposed in this method and will abort the get request
+   * if exceeded.
+   *
+   * @param restConfig
+   * @param httpRequestBase
+   * @return
+   * @throws IOException
+   */
+  protected static Object executeRequest(RestConfig restConfig, HttpRequestBase httpRequestBase) throws IOException {
+    URI uri = httpRequestBase.getURI();
+    HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
+    Optional<HttpHost> proxy = getProxy(restConfig);
+    HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy);
+    httpRequestBase.setConfig(getRequestConfig(restConfig, proxy));
+
+    // Schedule a command to abort the request if the timeout is exceeded
+    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpRequestBase::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
 
 Review comment:
   Why do we need to manage timeouts on our end? e.g. https://www.baeldung.com/httpclient-timeout

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services