You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 11:41:47 UTC

[flink-statefun] 04/10: [FLINK-17875] [core] Move JSON Pointers to individual entities

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1fa4664cdc29c93d7ec4b30f37de712d18505140
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu May 28 14:36:53 2020 +0800

    [FLINK-17875] [core] Move JSON Pointers to individual entities
    
    Now with the refactoring and the responsibility of parsing each entity
    resides in the individual JsonEntity implementation, it also makes sense
    to not keep all JSON Pointers in a single file, but instead move them
    closer to the JsonEntities that actually use them.
---
 .../flink/core/jsonmodule/EgressJsonEntity.java    | 16 ++--
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 37 ++++++---
 .../flink/core/jsonmodule/IngressJsonEntity.java   | 14 +++-
 .../flink/core/jsonmodule/JsonServiceLoader.java   | 11 ++-
 .../statefun/flink/core/jsonmodule/Pointers.java   | 87 ----------------------
 .../flink/core/jsonmodule/RouterJsonEntity.java    | 26 +++++--
 6 files changed, 77 insertions(+), 114 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
index 766d936..813b740 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
-import static org.apache.flink.statefun.flink.core.jsonmodule.Pointers.EGRESSES_POINTER;
-
 import com.google.protobuf.Any;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
@@ -31,10 +30,17 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 final class EgressJsonEntity implements JsonEntity {
 
+  private static final JsonPointer EGRESS_SPECS_POINTER = JsonPointer.compile("/egresses");
+
+  private static final class MetaPointers {
+    private static final JsonPointer ID = JsonPointer.compile("/egress/meta/id");
+    private static final JsonPointer TYPE = JsonPointer.compile("/egress/meta/type");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> egressNodes =
-        Selectors.listAt(moduleSpecRootNode, EGRESSES_POINTER);
+        Selectors.listAt(moduleSpecRootNode, EGRESS_SPECS_POINTER);
 
     egressNodes.forEach(
         egressNode -> {
@@ -44,13 +50,13 @@ final class EgressJsonEntity implements JsonEntity {
   }
 
   private static EgressType egressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Egress.META_TYPE);
+    String typeString = Selectors.textAt(spec, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(typeString);
     return new EgressType(nn.namespace(), nn.name());
   }
 
   private static EgressIdentifier<Any> egressId(JsonNode spec) {
-    String egressId = Selectors.textAt(spec, Pointers.Egress.META_ID);
+    String egressId = Selectors.textAt(spec, MetaPointers.ID);
     NamespaceNamePair nn = NamespaceNamePair.from(egressId);
     return new EgressIdentifier<>(nn.namespace(), nn.name(), Any.class);
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 7dc5173..9b11f00 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
@@ -50,6 +51,23 @@ import org.apache.flink.util.TimeUtils;
 
 final class FunctionJsonEntity implements JsonEntity {
 
+  private static final JsonPointer FUNCTION_SPECS_POINTER = JsonPointer.compile("/functions");
+
+  private static final class MetaPointers {
+    private static final JsonPointer KIND = JsonPointer.compile("/function/meta/kind");
+    private static final JsonPointer TYPE = JsonPointer.compile("/function/meta/type");
+  }
+
+  private static final class SpecPointers {
+    private static final JsonPointer HOSTNAME = JsonPointer.compile("/function/spec/host");
+    private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
+    private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
+    private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
+    private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
+    private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
+        JsonPointer.compile("/function/spec/maxNumBatchRequests");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
@@ -72,11 +90,11 @@ final class FunctionJsonEntity implements JsonEntity {
   }
 
   private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
-    return Selectors.listAt(moduleSpecRootNode, Pointers.FUNCTIONS_POINTER);
+    return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER);
   }
 
   private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
-    String functionKind = Selectors.textAt(functionNode, Pointers.Functions.META_KIND);
+    String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND);
     FunctionSpec.Kind kind =
         FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
     FunctionType functionType = functionType(functionNode);
@@ -100,33 +118,32 @@ final class FunctionJsonEntity implements JsonEntity {
   }
 
   private static List<String> functionStates(JsonNode functionNode) {
-    return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES);
+    return Selectors.textListAt(functionNode, SpecPointers.STATES);
   }
 
   private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
-    return Selectors.optionalIntegerAt(
-        functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS);
+    return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
   }
 
   private static Optional<Duration> optionalMaxRequestDuration(JsonNode functionNode) {
-    return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT)
+    return Selectors.optionalTextAt(functionNode, SpecPointers.TIMEOUT)
         .map(TimeUtils::parseDuration);
   }
 
   private static FunctionType functionType(JsonNode functionNode) {
-    String namespaceName = Selectors.textAt(functionNode, Pointers.Functions.META_TYPE);
+    String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
     return new FunctionType(nn.namespace(), nn.name());
   }
 
   private static InetSocketAddress functionAddress(JsonNode functionNode) {
-    String host = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_HOSTNAME);
-    int port = Selectors.integerAt(functionNode, Pointers.Functions.FUNCTION_PORT);
+    String host = Selectors.textAt(functionNode, SpecPointers.HOSTNAME);
+    int port = Selectors.integerAt(functionNode, SpecPointers.PORT);
     return new InetSocketAddress(host, port);
   }
 
   private static URI functionUri(JsonNode functionNode) {
-    String uri = Selectors.textAt(functionNode, Pointers.Functions.FUNCTION_ENDPOINT);
+    String uri = Selectors.textAt(functionNode, SpecPointers.ENDPOINT);
     URI typedUri = URI.create(uri);
     @Nullable String scheme = typedUri.getScheme();
     if (scheme == null) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
index d30675c..83c5d00 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -19,6 +19,7 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 import com.google.protobuf.Message;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
@@ -32,10 +33,17 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 final class IngressJsonEntity implements JsonEntity {
 
+  private static final JsonPointer INGRESS_SPECS_POINTER = JsonPointer.compile("/ingresses");
+
+  private static final class MetaPointers {
+    private static final JsonPointer ID = JsonPointer.compile("/ingress/meta/id");
+    private static final JsonPointer TYPE = JsonPointer.compile("/ingress/meta/type");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> ingressNodes =
-        Selectors.listAt(moduleSpecRootNode, Pointers.INGRESSES_POINTER);
+        Selectors.listAt(moduleSpecRootNode, INGRESS_SPECS_POINTER);
 
     ingressNodes.forEach(
         ingressNode -> {
@@ -50,13 +58,13 @@ final class IngressJsonEntity implements JsonEntity {
   }
 
   private static IngressType ingressType(JsonNode spec) {
-    String typeString = Selectors.textAt(spec, Pointers.Ingress.META_TYPE);
+    String typeString = Selectors.textAt(spec, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(typeString);
     return new IngressType(nn.namespace(), nn.name());
   }
 
   private static IngressIdentifier<Message> ingressId(JsonNode ingress) {
-    String ingressId = Selectors.textAt(ingress, Pointers.Ingress.META_ID);
+    String ingressId = Selectors.textAt(ingress, MetaPointers.ID);
     NamespaceNamePair nn = NamespaceNamePair.from(ingressId);
     return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index 84e3eba..a507540 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -22,6 +22,7 @@ import java.net.URL;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -32,6 +33,10 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
 public final class JsonServiceLoader {
 
+  private static final JsonPointer FORMAT_VERSION = JsonPointer.compile("/version");
+  private static final JsonPointer MODULE_META_TYPE = JsonPointer.compile("/module/meta/type");
+  private static final JsonPointer MODULE_SPEC = JsonPointer.compile("/module/spec");
+
   public static Iterable<StatefulFunctionModule> load() {
     ObjectMapper mapper = mapper();
 
@@ -68,7 +73,7 @@ public final class JsonServiceLoader {
   }
 
   private static void validateMeta(URL moduleYamlFile, JsonNode root) {
-    JsonNode typeNode = root.at(Pointers.MODULE_META_TYPE);
+    JsonNode typeNode = root.at(MODULE_META_TYPE);
     if (typeNode.isMissingNode()) {
       throw new IllegalStateException("Unable to find a module type in " + moduleYamlFile);
     }
@@ -82,7 +87,7 @@ public final class JsonServiceLoader {
   }
 
   private static JsonNode requireValidModuleSpecNode(URL moduleYamlFile, JsonNode root) {
-    final JsonNode moduleSpecNode = root.at(Pointers.MODULE_SPEC);
+    final JsonNode moduleSpecNode = root.at(MODULE_SPEC);
 
     if (moduleSpecNode.isMissingNode()) {
       throw new IllegalStateException("A module without a spec at " + moduleYamlFile);
@@ -92,7 +97,7 @@ public final class JsonServiceLoader {
   }
 
   private static FormatVersion requireValidFormatVersion(JsonNode root) {
-    final String formatVersion = Selectors.textAt(root, Pointers.FORMAT_VERSION);
+    final String formatVersion = Selectors.textAt(root, FORMAT_VERSION);
     return FormatVersion.fromString(formatVersion);
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
deleted file mode 100644
index 5c5b973..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
-
-public final class Pointers {
-
-  private Pointers() {}
-
-  public static final JsonPointer FORMAT_VERSION = JsonPointer.compile("/version");
-
-  // -------------------------------------------------------------------------------------
-  // top level spec definition
-  // -------------------------------------------------------------------------------------
-
-  public static final JsonPointer MODULE_META_TYPE = JsonPointer.compile("/module/meta/type");
-  public static final JsonPointer MODULE_SPEC = JsonPointer.compile("/module/spec");
-  public static final JsonPointer FUNCTIONS_POINTER = JsonPointer.compile("/functions");
-  public static final JsonPointer ROUTERS_POINTER = JsonPointer.compile("/routers");
-  public static final JsonPointer INGRESSES_POINTER = JsonPointer.compile("/ingresses");
-  public static final JsonPointer EGRESSES_POINTER = JsonPointer.compile("/egresses");
-
-  // -------------------------------------------------------------------------------------
-  // function
-  // -------------------------------------------------------------------------------------
-
-  public static final class Functions {
-    public static final JsonPointer META_KIND = JsonPointer.compile("/function/meta/kind");
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/function/meta/type");
-    public static final JsonPointer FUNCTION_HOSTNAME = JsonPointer.compile("/function/spec/host");
-    public static final JsonPointer FUNCTION_ENDPOINT =
-        JsonPointer.compile("/function/spec/endpoint");
-    public static final JsonPointer FUNCTION_PORT = JsonPointer.compile("/function/spec/port");
-    public static final JsonPointer FUNCTION_STATES = JsonPointer.compile("/function/spec/states");
-    public static final JsonPointer FUNCTION_TIMEOUT =
-        JsonPointer.compile("/function/spec/timeout");
-    public static final JsonPointer FUNCTION_MAX_NUM_BATCH_REQUESTS =
-        JsonPointer.compile("/function/spec/maxNumBatchRequests");
-  }
-
-  // -------------------------------------------------------------------------------------
-  // routers
-  // -------------------------------------------------------------------------------------
-
-  public static final class Routers {
-
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/router/meta/type");
-    public static final JsonPointer SPEC_INGRESS = JsonPointer.compile("/router/spec/ingress");
-    public static final JsonPointer SPEC_TARGET = JsonPointer.compile("/router/spec/target");
-    public static final JsonPointer SPEC_DESCRIPTOR =
-        JsonPointer.compile("/router/spec/descriptorSet");
-    public static final JsonPointer SPEC_MESSAGE_TYPE =
-        JsonPointer.compile("/router/spec/messageType");
-  }
-
-  // -------------------------------------------------------------------------------------
-  // ingresses
-  // -------------------------------------------------------------------------------------
-
-  public static final class Ingress {
-
-    public static final JsonPointer META_ID = JsonPointer.compile("/ingress/meta/id");
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/ingress/meta/type");
-  }
-
-  public static final class Egress {
-
-    public static final JsonPointer META_ID = JsonPointer.compile("/egress/meta/id");
-    public static final JsonPointer META_TYPE = JsonPointer.compile("/egress/meta/type");
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
index ee0ad47..30c3ff2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -23,6 +23,7 @@ import com.google.protobuf.Message;
 import java.io.IOException;
 import java.net.URL;
 import java.util.Optional;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.ResourceLocator;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
@@ -35,10 +36,23 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 final class RouterJsonEntity implements JsonEntity {
 
+  private static final JsonPointer ROUTER_SPECS_POINTER = JsonPointer.compile("/routers");
+
+  private static final class MetaPointers {
+    private static final JsonPointer TYPE = JsonPointer.compile("/router/meta/type");
+  }
+
+  private static final class SpecPointers {
+    private static final JsonPointer INGRESS = JsonPointer.compile("/router/spec/ingress");
+    private static final JsonPointer TARGET = JsonPointer.compile("/router/spec/target");
+    private static final JsonPointer DESCRIPTOR = JsonPointer.compile("/router/spec/descriptorSet");
+    private static final JsonPointer MESSAGE_TYPE = JsonPointer.compile("/router/spec/messageType");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> routerNodes =
-        Selectors.listAt(moduleSpecRootNode, Pointers.ROUTERS_POINTER);
+        Selectors.listAt(moduleSpecRootNode, ROUTER_SPECS_POINTER);
 
     routerNodes.forEach(
         routerNode -> {
@@ -57,9 +71,9 @@ final class RouterJsonEntity implements JsonEntity {
   // ----------------------------------------------------------------------------------------------------------
 
   private static Router<Message> dynamicRouter(JsonNode router) {
-    String addressTemplate = Selectors.textAt(router, Pointers.Routers.SPEC_TARGET);
-    String descriptorSetPath = Selectors.textAt(router, Pointers.Routers.SPEC_DESCRIPTOR);
-    String messageType = Selectors.textAt(router, Pointers.Routers.SPEC_MESSAGE_TYPE);
+    String addressTemplate = Selectors.textAt(router, SpecPointers.TARGET);
+    String descriptorSetPath = Selectors.textAt(router, SpecPointers.DESCRIPTOR);
+    String messageType = Selectors.textAt(router, SpecPointers.MESSAGE_TYPE);
 
     ProtobufDescriptorMap descriptorPath = protobufDescriptorMap(descriptorSetPath);
     Optional<Descriptors.GenericDescriptor> maybeDescriptor =
@@ -92,13 +106,13 @@ final class RouterJsonEntity implements JsonEntity {
   }
 
   private static IngressIdentifier<Message> targetRouterIngress(JsonNode routerNode) {
-    String targetIngress = Selectors.textAt(routerNode, Pointers.Routers.SPEC_INGRESS);
+    String targetIngress = Selectors.textAt(routerNode, SpecPointers.INGRESS);
     NamespaceNamePair nn = NamespaceNamePair.from(targetIngress);
     return new IngressIdentifier<>(Message.class, nn.namespace(), nn.name());
   }
 
   private static void requireProtobufRouterType(JsonNode routerNode) {
-    String routerType = Selectors.textAt(routerNode, Pointers.Routers.META_TYPE);
+    String routerType = Selectors.textAt(routerNode, MetaPointers.TYPE);
     if (!routerType.equalsIgnoreCase("org.apache.flink.statefun.sdk/protobuf-router")) {
       throw new IllegalStateException("Invalid router type " + routerType);
     }