You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:45 UTC

[flink-statefun] 02/10: [FLINK-17875] [core] Wire in JsonEntities into JsonModule

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 758076b0033bca61185911557041614d7bae0ada
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 14:13:17 2020 +0800

    [FLINK-17875] [core] Wire in JsonEntities into JsonModule
---
 .../statefun/flink/core/jsonmodule/JsonModule.java | 294 ++-------------------
 .../flink/core/jsonmodule/JsonModuleFactory.java   |  46 ----
 .../flink/core/jsonmodule/JsonServiceLoader.java   |  31 ++-
 .../flink/core/jsonmodule/JsonModuleTest.java      |  10 +-
 4 files changed, 34 insertions(+), 347 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index bd96211..cedeb97 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -18,303 +18,41 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 import static java.lang.String.format;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toMap;
-import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
 
-import com.google.protobuf.Any;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
 import java.net.URL;
-import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.StreamSupport;
-import javax.annotation.Nullable;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.statefun.flink.common.ResourceLocator;
-import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
-import org.apache.flink.statefun.flink.common.json.Selectors;
-import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
-import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
-import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
-import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
-import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
-import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
-import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
-import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
-import org.apache.flink.statefun.sdk.EgressType;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.IngressType;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-import org.apache.flink.statefun.sdk.io.Router;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-import org.apache.flink.util.TimeUtils;
 
 final class JsonModule implements StatefulFunctionModule {
-  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
-  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
-  private final JsonNode spec;
+
+  /** Entities in the JSON moduleSpecNode that should be parsed and bound to the module. */
+  private static final List<JsonEntity> ENTITIES =
+      Arrays.asList(
+          new FunctionJsonEntity(),
+          new IngressJsonEntity(),
+          new RouterJsonEntity(),
+          new EgressJsonEntity());
+
+  private final JsonNode moduleSpecNode;
+  private final FormatVersion formatVersion;
   private final URL moduleUrl;
 
-  public JsonModule(JsonNode spec, URL moduleUrl) {
-    this.spec = Objects.requireNonNull(spec);
+  public JsonModule(JsonNode moduleSpecNode, FormatVersion formatVersion, URL moduleUrl) {
+    this.moduleSpecNode = Objects.requireNonNull(moduleSpecNode);
+    this.formatVersion = Objects.requireNonNull(formatVersion);
     this.moduleUrl = Objects.requireNonNull(moduleUrl);
   }
 
   public void configure(Map<String, String> conf, Binder binder) {
     try {
-      configureFunctions(binder, Selectors.listAt(spec, Pointers.FUNCTIONS_POINTER));
-      configureRouters(binder, Selectors.listAt(spec, Pointers.ROUTERS_POINTER));
-      configureIngress(binder, Selectors.listAt(spec, Pointers.INGRESSES_POINTER));
-      configureEgress(binder, Selectors.listAt(spec, Pointers.EGRESSES_POINTER));
+      ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
     } catch (Throwable t) {
       throw new ModuleConfigurationException(
           format("Error while parsing module at %s", moduleUrl), t);
     }
   }
-
-  private void configureFunctions(Binder binder, Iterable<? extends JsonNode> functions) {
-    final Map<Kind, Map<FunctionType, FunctionSpec>> definedFunctions =
-        StreamSupport.stream(functions.spliterator(), false)
-            .map(JsonModule::parseFunctionSpec)
-            .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
-
-    for (Entry<Kind, Map<FunctionType, FunctionSpec>> entry : definedFunctions.entrySet()) {
-      StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
-      Set<FunctionType> functionTypes = entry.getValue().keySet();
-      for (FunctionType type : functionTypes) {
-        binder.bindFunctionProvider(type, provider);
-      }
-    }
-  }
-
-  private static StatefulFunctionProvider functionProvider(
-      Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
-    switch (kind) {
-      case HTTP:
-        return new HttpFunctionProvider(
-            transformValues(definedFunctions, HttpFunctionSpec.class::cast));
-      case GRPC:
-        return new GrpcFunctionProvider(
-            transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
-      default:
-        throw new IllegalStateException("Unexpected value: " + kind);
-    }
-  }
-
-  private void configureRouters(Binder binder, Iterable<? extends JsonNode> routerNodes) {
-    for (JsonNode router : routerNodes) {
-      // currently the only type of router supported in a module.yaml, is a protobuf dynamicMessage
-      // router once we will introduce further router types we should refactor this to be more
-      // dynamic.
-      requireProtobufRouterType(router);
-
-      IngressIdentifier<Message> id = targetRouterIngress(router);
-      binder.bindIngressRouter(id, dynamicRouter(router));
-    }
-  }
-
-  private void configureIngress(Binder binder, Iterable<? extends JsonNode> ingressNode) {
-    for (JsonNode ingress : ingressNode) {
-      IngressIdentifier<Message> id = ingressId(ingress);
-      IngressType type = ingressType(ingress);
-
-      JsonIngressSpec<Message> ingressSpec = new JsonIngressSpec<>(type, id, ingress);
-      binder.bindIngress(ingressSpec);
-
-      if (isAutoRoutableIngress(type)) {
-        binder.bindIngressRouter(id, new AutoRoutableProtobufRouter());
-      }
-    }
-  }
-
-  private void configureEgress(Binder binder, Iterable<? extends JsonNode> egressNode) {
-    for (JsonNode egress : egressNode) {
-      EgressIdentifier<Any> id = egressId(egress);
-      EgressType type = egressType(egress);
-
-      JsonEgressSpec<Any> egressSpec = new JsonEgressSpec<>(type, id, egress);
-      binder.bindEgress(egressSpec);
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Ingresses
-  // ----------------------------------------------------------------------------------------------------------
-  private static IngressType ingressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
-    return new IngressType(nn.namespace(), nn.name());
-  }
-
-  private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
-    String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
-    NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
-    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
-  }
-
-  private static boolean isAutoRoutableIngress(IngressType ingressType) {
-    return ingressType.equals(ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE)
-        || ingressType.equals(PolyglotKinesisIOTypes.ROUTABLE_PROTOBUF_KINESIS_INGRESS_TYPE);
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Egresses
-  // ----------------------------------------------------------------------------------------------------------
-  private static EgressType egressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(typeString);
-    return new EgressType(nn.namespace(), nn.name());
-  }
-
-  private static EgressIdentifier<Any> egressId(JsonNode spec) {
-    String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
-    NamespaceNamePair nn = NamespaceNamePair.from(egressId);
-    return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Routers
-  // ----------------------------------------------------------------------------------------------------------
-  private static Router<Message> dynamicRouter(JsonNode router) {
-    String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
-    String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
-    String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
-
-    ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
-    Optional<Descriptors.GenericDescriptor> maybeDescriptor =
-        descriptorPath.getDescriptorByName(messageType);
-    if (!maybeDescriptor.isPresent()) {
-      throw new IllegalStateException(
-          "Error while processing a router definition. Unable to locate a message "
-              + messageType
-              + " in a descriptor set "
-              + descriptorSetPath);
-    }
-    return ProtobufRouter.forAddressTemplate(
-        (Descriptors.Descriptor) maybeDescriptor.get(), addressTemplate);
-  }
-
-  private static ProtobufDescriptorMap protobufDescriptorMap(String descriptorSetPath) {
-    try {
-      URL url = ResourceLocator.findNamedResource(descriptorSetPath);
-      if (url == null) {
-        throw new IllegalArgumentException(
-            "Unable to locate a Protobuf descriptor set at " + descriptorSetPath);
-      }
-      return ProtobufDescriptorMap.from(url);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "Error while processing a router definition. Unable to read the descriptor set at  "
-              + descriptorSetPath,
-          e);
-    }
-  }
-
-  private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
-    String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
-    NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
-    return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
-  }
-
-  private static void requireProtobufRouterType(JsonNode routerNode) {
-    String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
-    if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
-      throw new IllegalStateException("Invalid router type " + routerType);
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------------------
-  // Functions
-  // ----------------------------------------------------------------------------------------------------------
-
-  private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
-    String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
-    FunctionSpec.Kind kind = Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
-    FunctionType functionType = functionType(functionNode);
-    switch (kind) {
-      case HTTP:
-        return new HttpFunctionSpec(
-            functionType,
-            functionUri(functionNode),
-            functionStates(functionNode),
-            maxRequestDuration(functionNode),
-            maxNumBatchRequests(functionNode));
-      case GRPC:
-        return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
-      default:
-        throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
-    }
-  }
-
-  private static int maxNumBatchRequests(JsonNode functionNode) {
-    return Selectors.optionalIntegerAt(functionNode, Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
-        .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
-  }
-
-  private static Duration maxRequestDuration(JsonNode functionNode) {
-    return Selectors.optionalTextAt(functionNode, Functions.FUNCTION_TIMEOUT)
-        .map(TimeUtils::parseDuration)
-        .orElse(DEFAULT_HTTP_TIMEOUT);
-  }
-
-  private static List<String> functionStates(JsonNode functionNode) {
-    return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
-  }
-
-  private static FunctionType functionType(JsonNode functionNode) {
-    String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
-    return new FunctionType(nn.namespace(), nn.name());
-  }
-
-  private static InetSocketAddress functionAddress(JsonNode functionNode) {
-    String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
-    int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
-    return new InetSocketAddress(host, port);
-  }
-
-  private static URI functionUri(JsonNode functionNode) {
-    String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
-    URI typedUri = URI.create(uri);
-    @Nullable String scheme = typedUri.getScheme();
-    if (scheme == null) {
-      throw new IllegalArgumentException(
-          "Missing scheme in function endpoint "
-              + uri
-              + "; an http or https scheme must be provided.");
-    }
-    if (scheme.equalsIgnoreCase("http")
-        || scheme.equalsIgnoreCase("https")
-        || scheme.equalsIgnoreCase("http+unix")
-        || scheme.equalsIgnoreCase("https+unix")) {
-      return typedUri;
-    }
-    throw new IllegalArgumentException(
-        "Missing scheme in function endpoint "
-            + uri
-            + "; an http or https or http+unix or https+unix scheme must be provided.");
-  }
-
-  private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
-    return toMap(FunctionSpec::functionType, Function.identity());
-  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java
deleted file mode 100644
index 3616bcd..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import java.net.URL;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.statefun.flink.common.json.Selectors;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-final class JsonModuleFactory {
-
-  private JsonModuleFactory() {}
-
-  static StatefulFunctionModule create(JsonNode root, URL moduleUrl) {
-    final FormatVersion formatVersion = formatVersion(root);
-    final JsonNode spec = root.at(Pointers.MODULE_SPEC);
-
-    switch (formatVersion) {
-      case v1_0:
-        return new JsonModule(spec, moduleUrl);
-      default:
-        throw new IllegalArgumentException("Unrecognized format version: " + formatVersion);
-    }
-  }
-
-  private static FormatVersion formatVersion(JsonNode root) {
-    String versionStr = Selectors.textAt(root, Pointers.FORMAT_VERSION);
-    return FormatVersion.fromString(versionStr);
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index 0ac7712..84e3eba 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -26,6 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import org.apache.flink.statefun.flink.common.ResourceLocator;
+import org.apache.flink.statefun.flink.common.json.Selectors;
 import org.apache.flink.statefun.flink.core.spi.Constants;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
@@ -45,8 +46,9 @@ public final class JsonServiceLoader {
   @VisibleForTesting
   static StatefulFunctionModule fromUrl(ObjectMapper mapper, URL moduleUrl) {
     try {
-      JsonNode root = readAndValidateModuleTree(mapper, moduleUrl);
-      return JsonModuleFactory.create(root, moduleUrl);
+      final JsonNode root = readAndValidateModuleTree(mapper, moduleUrl);
+      return new JsonModule(
+          requireValidModuleSpecNode(moduleUrl, root), requireValidFormatVersion(root), moduleUrl);
     } catch (Throwable t) {
       throw new RuntimeException("Failed loading a module at " + moduleUrl, t);
     }
@@ -55,21 +57,13 @@ public final class JsonServiceLoader {
   /**
    * Read a {@code StatefulFunction} module definition.
    *
-   * <p>A valid resource module definition has to contain the following sections:
-   *
-   * <ul>
-   *   <li>meta - contains the metadata associated with this module, such as its type.
-   *   <li>spec - a specification of the module. i.e. the definied functions, routers etc'.
-   * </ul>
-   *
-   * <p>If any of these sections are missing, this would be considered an invalid module definition,
-   * in addition a type is a mandatory field of a module spec.
+   * <p>A valid resource module definition has to contain the metadata associated with this module,
+   * such as its type.
    */
   private static JsonNode readAndValidateModuleTree(ObjectMapper mapper, URL moduleYamlFile)
       throws IOException {
     JsonNode root = mapper.readTree(moduleYamlFile);
     validateMeta(moduleYamlFile, root);
-    validateSpec(moduleYamlFile, root);
     return root;
   }
 
@@ -87,10 +81,19 @@ public final class JsonServiceLoader {
     }
   }
 
-  private static void validateSpec(URL moduleYamlFile, JsonNode root) {
-    if (root.at(Pointers.MODULE_SPEC).isMissingNode()) {
+  private static JsonNode requireValidModuleSpecNode(URL moduleYamlFile, JsonNode root) {
+    final JsonNode moduleSpecNode = root.at(Pointers.MODULE_SPEC);
+
+    if (moduleSpecNode.isMissingNode()) {
       throw new IllegalStateException("A module without a spec at " + moduleYamlFile);
     }
+
+    return moduleSpecNode;
+  }
+
+  private static FormatVersion requireValidFormatVersion(JsonNode root) {
+    final String formatVersion = Selectors.textAt(root, Pointers.FORMAT_VERSION);
+    return FormatVersion.fromString(formatVersion);
   }
 
   @VisibleForTesting
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index f1f3aa1..28ee1fb 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -22,10 +22,8 @@ import static org.junit.Assert.assertThat;
 
 import com.google.protobuf.Any;
 import com.google.protobuf.Message;
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
@@ -98,13 +96,7 @@ public class JsonModuleTest {
     URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
     assertThat(moduleUrl, not(nullValue()));
     ObjectMapper mapper = JsonServiceLoader.mapper();
-    final JsonNode json;
-    try {
-      json = mapper.readTree(moduleUrl);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return JsonModuleFactory.create(json, moduleUrl);
+    return JsonServiceLoader.fromUrl(mapper, moduleUrl);
   }
 
   private static StatefulFunctionsUniverse emptyUniverse() {