You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/15 06:29:52 UTC
[pulsar] branch master updated: Added an explicit field in the
function details for componenttype (#4250)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ad4c9f3 Added an explicit field in the function details for componenttype (#4250)
ad4c9f3 is described below
commit ad4c9f3608cb228bbb59bade7e4e12ccd14e2fb3
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue May 14 23:29:45 2019 -0700
Added an explicit field in the function details for componenttype (#4250)
* Added an explicit field in the function details for componenttype
* Fixed unittests
* Updated the defn of python pb file
* Added licence
* Took feedback into account
* Added unittest
---
.../pulsar/functions/instance/ContextImpl.java | 6 +-
.../pulsar/functions/instance/InstanceUtils.java | 19 +-
.../functions/instance/JavaInstanceRunnable.java | 3 +-
.../instance/stats/ComponentStatsManager.java | 5 +-
.../instance/src/main/python/Function_pb2.py | 366 +++++++++++++--------
.../pulsar/functions/instance/ContextImplTest.java | 5 +-
.../functions/instance/InstanceUtilsTest.java | 77 +++++
.../proto/src/main/proto/Function.proto | 7 +
.../functions/runtime/KubernetesRuntime.java | 3 +-
...{ComponentType.java => ComponentTypeUtils.java} | 27 +-
.../functions/utils/FunctionConfigUtils.java | 2 +
.../pulsar/functions/utils/SinkConfigUtils.java | 2 +
.../pulsar/functions/utils/SourceConfigUtils.java | 2 +
.../functions/worker/rest/api/ComponentImpl.java | 237 +++++++------
.../functions/worker/rest/api/FunctionsImpl.java | 3 +-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 14 +-
.../functions/worker/rest/api/SourceImpl.java | 14 +-
.../worker/rest/api/FunctionsImplTest.java | 5 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 3 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 13 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 9 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 9 +-
22 files changed, 500 insertions(+), 331 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index f0d5ade..5bf8c59 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -35,10 +35,10 @@ import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
@@ -91,11 +91,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
}
- private final ComponentType componentType;
+ private final Function.FunctionDetails.ComponentType componentType;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
- ComponentType componentType, ComponentStatsManager statsManager) {
+ Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 5cae6a8..e73f1ce 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.functions.instance;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
import lombok.experimental.UtilityClass;
@@ -31,7 +28,6 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.Reflections;
import net.jodah.typetools.TypeResolver;
@@ -89,23 +85,26 @@ public class InstanceUtils {
}
}
- public ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
+ public Function.FunctionDetails.ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
+ if (functionDetails.getComponentType() != Function.FunctionDetails.ComponentType.UNKNOWN) {
+ return functionDetails.getComponentType();
+ }
Function.SourceSpec sourceSpec = functionDetails.getSource();
Function.SinkSpec sinkSpec = functionDetails.getSink();
if (sourceSpec.getInputSpecsCount() == 0) {
- return SOURCE;
+ return Function.FunctionDetails.ComponentType.SOURCE;
}
// Now its between sink and function
if (!isEmpty(sinkSpec.getBuiltin())) {
// if its built in, its a sink
- return SINK;
+ return Function.FunctionDetails.ComponentType.SINK;
}
if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
- return FUNCTION;
+ return Function.FunctionDetails.ComponentType.FUNCTION;
}
- return SINK;
+ return Function.FunctionDetails.ComponentType.SINK;
}
public static String getDefaultSubscriptionName(String tenant, String namespace, String name) {
@@ -119,7 +118,7 @@ public class InstanceUtils {
functionDetails.getName());
}
- public static Map<String, String> getProperties(ComponentType componentType,
+ public static Map<String, String> getProperties(Function.FunctionDetails.ComponentType componentType,
String fullyQualifiedName, int instanceId) {
Map<String, String> properties = new HashMap<>();
switch (componentType) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 6b265ae..b6c4fc8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -66,7 +66,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
@@ -129,7 +128,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private InstanceCache instanceCache;
- private final ComponentType componentType;
+ private final org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType;
private final Map<String, String> properties;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index b03fbf0..716fded 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.functions.instance.stats;
import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.proto.Function;
import java.io.IOException;
import java.io.StringWriter;
@@ -58,7 +57,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
- ComponentType componentType) {
+ Function.FunctionDetails.ComponentType componentType) {
switch (componentType) {
case FUNCTION:
return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 8187e09..409f749 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -17,6 +17,7 @@
# under the License.
#
+# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: Function.proto
@@ -27,7 +28,6 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
-from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@@ -39,7 +39,8 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='Function.proto',
package='proto',
syntax='proto3',
- serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
+ serialized_options=_b('\n!org.apache.pulsar.functions.protoB\010Function'),
+ serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\x85\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -50,21 +51,21 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='ATLEAST_ONCE', index=0, number=0,
- options=None,
+ serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ATMOST_ONCE', index=1, number=1,
- options=None,
+ serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='EFFECTIVELY_ONCE', index=2, number=2,
- options=None,
+ serialized_options=None,
type=None),
],
containing_type=None,
- options=None,
- serialized_start=2022,
- serialized_end=2101,
+ serialized_options=None,
+ serialized_start=2302,
+ serialized_end=2381,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -77,17 +78,17 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='SHARED', index=0, number=0,
- options=None,
+ serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FAILOVER', index=1, number=1,
- options=None,
+ serialized_options=None,
type=None),
],
containing_type=None,
- options=None,
- serialized_start=2103,
- serialized_end=2147,
+ serialized_options=None,
+ serialized_start=2383,
+ serialized_end=2427,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -100,17 +101,17 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='RUNNING', index=0, number=0,
- options=None,
+ serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='STOPPED', index=1, number=1,
- options=None,
+ serialized_options=None,
type=None),
],
containing_type=None,
- options=None,
- serialized_start=2149,
- serialized_end=2190,
+ serialized_options=None,
+ serialized_start=2429,
+ serialized_end=2470,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
@@ -132,20 +133,54 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='JAVA', index=0, number=0,
- options=None,
+ serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='PYTHON', index=1, number=1,
- options=None,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='GO', index=2, number=3,
+ serialized_options=None,
type=None),
],
containing_type=None,
- options=None,
- serialized_start=604,
- serialized_end=635,
+ serialized_options=None,
+ serialized_start=687,
+ serialized_end=726,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
+_FUNCTIONDETAILS_COMPONENTTYPE = _descriptor.EnumDescriptor(
+ name='ComponentType',
+ full_name='proto.FunctionDetails.ComponentType',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='UNKNOWN', index=0, number=0,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='FUNCTION', index=1, number=1,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='SOURCE', index=2, number=2,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='SINK', index=3, number=3,
+ serialized_options=None,
+ type=None),
+ ],
+ containing_type=None,
+ serialized_options=None,
+ serialized_start=728,
+ serialized_end=792,
+)
+_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
+
_RESOURCES = _descriptor.Descriptor(
name='Resources',
@@ -160,28 +195,28 @@ _RESOURCES = _descriptor.Descriptor(
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='ram', full_name='proto.Resources.ram', index=1,
number=2, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='disk', full_name='proto.Resources.disk', index=2,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@@ -205,21 +240,21 @@ _RETRYDETAILS = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@@ -243,127 +278,142 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='namespace', full_name='proto.FunctionDetails.namespace', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='name', full_name='proto.FunctionDetails.name', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='className', full_name='proto.FunctionDetails.className', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=5,
number=6, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7,
number=16, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='runtime', full_name='proto.FunctionDetails.runtime', index=8,
number=8, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9,
number=9, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parallelism', full_name='proto.FunctionDetails.parallelism', index=10,
number=10, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='source', full_name='proto.FunctionDetails.source', index=11,
number=11, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='sink', full_name='proto.FunctionDetails.sink', index=12,
number=12, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='resources', full_name='proto.FunctionDetails.resources', index=13,
number=13, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=14,
number=14, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=15,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='runtimeFlags', full_name='proto.FunctionDetails.runtimeFlags', index=16,
+ number=17, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='componentType', full_name='proto.FunctionDetails.componentType', index=17,
+ number=18, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_FUNCTIONDETAILS_RUNTIME,
+ _FUNCTIONDETAILS_COMPONENTTYPE,
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=147,
- serialized_end=635,
+ serialized_end=792,
)
@@ -380,21 +430,21 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=788,
- serialized_end=822,
+ serialized_start=945,
+ serialized_end=979,
)
_CONSUMERSPEC = _descriptor.Descriptor(
@@ -410,42 +460,42 @@ _CONSUMERSPEC = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='serdeClassName', full_name='proto.ConsumerSpec.serdeClassName', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='isRegexPattern', full_name='proto.ConsumerSpec.isRegexPattern', index=2,
number=3, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='receiverQueueSize', full_name='proto.ConsumerSpec.receiverQueueSize', index=3,
number=4, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=638,
- serialized_end=822,
+ serialized_start=795,
+ serialized_end=979,
)
@@ -462,28 +512,28 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ serialized_options=_b('8\001'),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1205,
- serialized_end=1266,
+ serialized_start=1362,
+ serialized_end=1423,
)
_SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -499,28 +549,28 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='proto.SourceSpec.InputSpecsEntry.value', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ serialized_options=_b('8\001'),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1268,
- serialized_end=1338,
+ serialized_start=1425,
+ serialized_end=1495,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -536,91 +586,91 @@ _SOURCESPEC = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='configs', full_name='proto.SourceSpec.configs', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
number=4, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001')), file=DESCRIPTOR),
+ serialized_options=_b('\030\001'), file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='inputSpecs', full_name='proto.SourceSpec.inputSpecs', index=5,
number=10, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=6,
number=6, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='topicsPattern', full_name='proto.SourceSpec.topicsPattern', index=7,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001')), file=DESCRIPTOR),
+ serialized_options=_b('\030\001'), file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='builtin', full_name='proto.SourceSpec.builtin', index=8,
number=8, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='subscriptionName', full_name='proto.SourceSpec.subscriptionName', index=9,
number=9, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cleanupSubscription', full_name='proto.SourceSpec.cleanupSubscription', index=10,
number=11, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY, _SOURCESPEC_INPUTSPECSENTRY, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=825,
- serialized_end=1338,
+ serialized_start=982,
+ serialized_end=1495,
)
@@ -637,63 +687,63 @@ _SINKSPEC = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='configs', full_name='proto.SinkSpec.configs', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='topic', full_name='proto.SinkSpec.topic', index=3,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='builtin', full_name='proto.SinkSpec.builtin', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='schemaType', full_name='proto.SinkSpec.schemaType', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1341,
- serialized_end=1486,
+ serialized_start=1498,
+ serialized_end=1643,
)
@@ -710,28 +760,28 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='originalFileName', full_name='proto.PackageLocationMetaData.originalFileName', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1488,
- serialized_end=1560,
+ serialized_start=1645,
+ serialized_end=1717,
)
@@ -748,28 +798,28 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='proto.FunctionMetaData.InstanceStatesEntry.value', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ serialized_options=_b('8\001'),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1795,
- serialized_end=1870,
+ serialized_start=2013,
+ serialized_end=2088,
)
_FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -785,49 +835,94 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='packageLocation', full_name='proto.FunctionMetaData.packageLocation', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='version', full_name='proto.FunctionMetaData.version', index=2,
number=3, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='createTime', full_name='proto.FunctionMetaData.createTime', index=3,
number=4, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceStates', full_name='proto.FunctionMetaData.instanceStates', index=4,
number=5, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='functionAuthSpec', full_name='proto.FunctionMetaData.functionAuthSpec', index=5,
+ number=6, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_FUNCTIONMETADATA_INSTANCESTATESENTRY, ],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1563,
- serialized_end=1870,
+ serialized_start=1720,
+ serialized_end=2088,
+)
+
+
+_FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
+ name='FunctionAuthenticationSpec',
+ full_name='proto.FunctionAuthenticationSpec',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='data', full_name='proto.FunctionAuthenticationSpec.data', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b(""),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='provider', full_name='proto.FunctionAuthenticationSpec.provider', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2090,
+ serialized_end=2150,
)
@@ -844,28 +939,28 @@ _INSTANCE = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceId', full_name='proto.Instance.instanceId', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1872,
- serialized_end=1953,
+ serialized_start=2152,
+ serialized_end=2233,
)
@@ -882,28 +977,28 @@ _ASSIGNMENT = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='workerId', full_name='proto.Assignment.workerId', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
- options=None,
+ serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=1955,
- serialized_end=2020,
+ serialized_start=2235,
+ serialized_end=2300,
)
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
@@ -912,7 +1007,9 @@ _FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
_FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
_FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
_FUNCTIONDETAILS.fields_by_name['retryDetails'].message_type = _RETRYDETAILS
+_FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_COMPONENTTYPE
_FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
+_FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
_CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
_CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type = _CONSUMERSPEC_RECEIVERQUEUESIZE
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
@@ -926,6 +1023,7 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY.containing_type = _FUNCTIONMETADATA
_FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = _FUNCTIONDETAILS
_FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA
_FUNCTIONMETADATA.fields_by_name['instanceStates'].message_type = _FUNCTIONMETADATA_INSTANCESTATESENTRY
+_FUNCTIONMETADATA.fields_by_name['functionAuthSpec'].message_type = _FUNCTIONAUTHENTICATIONSPEC
_INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
_ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES
@@ -936,6 +1034,7 @@ DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA
DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA
+DESCRIPTOR.message_types_by_name['FunctionAuthenticationSpec'] = _FUNCTIONAUTHENTICATIONSPEC
DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT
DESCRIPTOR.enum_types_by_name['ProcessingGuarantees'] = _PROCESSINGGUARANTEES
@@ -1031,6 +1130,13 @@ FunctionMetaData = _reflection.GeneratedProtocolMessageType('FunctionMetaData',
_sym_db.RegisterMessage(FunctionMetaData)
_sym_db.RegisterMessage(FunctionMetaData.InstanceStatesEntry)
+FunctionAuthenticationSpec = _reflection.GeneratedProtocolMessageType('FunctionAuthenticationSpec', (_message.Message,), dict(
+ DESCRIPTOR = _FUNCTIONAUTHENTICATIONSPEC,
+ __module__ = 'Function_pb2'
+ # @@protoc_insertion_point(class_scope:proto.FunctionAuthenticationSpec)
+ ))
+_sym_db.RegisterMessage(FunctionAuthenticationSpec)
+
Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
DESCRIPTOR = _INSTANCE,
__module__ = 'Function_pb2'
@@ -1046,16 +1152,10 @@ Assignment = _reflection.GeneratedProtocolMessageType('Assignment', (_message.Me
_sym_db.RegisterMessage(Assignment)
-DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function'))
-_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True
-_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SOURCESPEC_INPUTSPECSENTRY.has_options = True
-_SOURCESPEC_INPUTSPECSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SOURCESPEC.fields_by_name['topicsToSerDeClassName'].has_options = True
-_SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001'))
-_SOURCESPEC.fields_by_name['topicsPattern'].has_options = True
-_SOURCESPEC.fields_by_name['topicsPattern']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001'))
-_FUNCTIONMETADATA_INSTANCESTATESENTRY.has_options = True
-_FUNCTIONMETADATA_INSTANCESTATESENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+DESCRIPTOR._options = None
+_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = None
+_SOURCESPEC_INPUTSPECSENTRY._options = None
+_SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = None
+_SOURCESPEC.fields_by_name['topicsPattern']._options = None
+_FUNCTIONMETADATA_INSTANCESTATESENTRY._options = None
# @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 7eb7aae..43a5649 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.mockito.Matchers;
import org.slf4j.Logger;
import org.testng.annotations.BeforeMethod;
@@ -42,11 +41,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -89,7 +86,7 @@ public class ContextImplTest {
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
- ComponentType.FUNCTION, null);
+ FunctionDetails.ComponentType.FUNCTION, null);
context.setCurrentMessageContext(new Record<String>() {
@Override
public String getValue() {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/InstanceUtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/InstanceUtilsTest.java
new file mode 100644
index 0000000..698df6c
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/InstanceUtilsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class InstanceUtilsTest {
+
+ /**
+ * Test the calculateSubjectType function for sources
+ */
+ @Test
+ public void testCalculateSubjectTypeForSource() {
+ FunctionDetails.Builder builder = FunctionDetails.newBuilder();
+ // no input topics mean source
+ builder.setSource(Function.SourceSpec.newBuilder().build());
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SOURCE);
+ // make sure that if the componenttype is set, that gets precedence.
+ builder.setComponentType(FunctionDetails.ComponentType.SINK);
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SINK);
+ builder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.FUNCTION);
+ }
+
+ /**
+ * Test the calculateSubjectType function for function
+ */
+ @Test
+ public void testCalculateSubjectTypeForFunction() {
+ FunctionDetails.Builder builder = FunctionDetails.newBuilder();
+ // an input but no sink classname is a function
+ builder.setSource(Function.SourceSpec.newBuilder().putInputSpecs("topic", Function.ConsumerSpec.newBuilder().build()).build());
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.FUNCTION);
+ // make sure that if the componenttype is set, that gets precedence.
+ builder.setComponentType(FunctionDetails.ComponentType.SOURCE);
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SOURCE);
+ builder.setComponentType(FunctionDetails.ComponentType.SINK);
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SINK);
+ }
+
+ /**
+ * Test the calculateSubjectType function for Sink
+ */
+ @Test
+ public void testCalculateSubjectTypeForSink() {
+ FunctionDetails.Builder builder = FunctionDetails.newBuilder();
+ // an input and a sink classname is a sink
+ builder.setSource(Function.SourceSpec.newBuilder().putInputSpecs("topic", Function.ConsumerSpec.newBuilder().build()).build());
+ builder.setSink(Function.SinkSpec.newBuilder().setClassName("something").build());
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SINK);
+ // make sure that if the componenttype is set, that gets precedence.
+ builder.setComponentType(FunctionDetails.ComponentType.SOURCE);
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SOURCE);
+ builder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
+ assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.FUNCTION);
+ }
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index c435c09..34b6c90 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -51,6 +51,12 @@ message FunctionDetails {
PYTHON = 1;
GO = 3;
}
+ enum ComponentType {
+ UNKNOWN = 0;
+ FUNCTION = 1;
+ SOURCE = 2;
+ SINK = 3;
+ }
string tenant = 1;
string namespace = 2;
string name = 3;
@@ -68,6 +74,7 @@ message FunctionDetails {
string packageUrl = 14; //present only if function submitted with package-url
RetryDetails retryDetails = 15;
string runtimeFlags = 17;
+ ComponentType componentType = 18;
}
message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 3b10092..4c1e3d9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -62,7 +62,6 @@ import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionCommon;
import java.io.IOException;
@@ -853,7 +852,7 @@ public class KubernetesRuntime implements Runtime {
private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
final Map<String, String> labels = new HashMap<>();
- ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
+ Function.FunctionDetails.ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
String component;
switch (componentType) {
case FUNCTION:
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentTypeUtils.java
similarity index 63%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentTypeUtils.java
index a99edc2..240eb63 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentTypeUtils.java
@@ -16,21 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.utils;
-
-public enum ComponentType {
- FUNCTION("Function"),
- SOURCE("Source"),
- SINK("Sink");
- private final String componentName;
+package org.apache.pulsar.functions.utils;
- ComponentType(String componentName) {
- this.componentName = componentName;
- }
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
- @Override
- public String toString() {
- return componentName;
+public class ComponentTypeUtils {
+ public static String toString(FunctionDetails.ComponentType componentType) {
+ switch (componentType) {
+ case FUNCTION:
+ return "Function";
+ case SOURCE:
+ return "Source";
+ case SINK:
+ return "Sink";
+ default:
+ throw new RuntimeException("Unknown componentType " + componentType);
+ }
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index a9e5a00..e207e1e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -227,6 +227,8 @@ public class FunctionConfigUtils {
functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
}
+ functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
+
return functionDetailsBuilder.build();
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 65cf6f5..40e96b2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -203,6 +203,8 @@ public class SinkConfigUtils {
functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
}
+ functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.SINK);
+
return functionDetailsBuilder.build();
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 01ea03a..90d63df 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -142,6 +142,8 @@ public class SourceConfigUtils {
functionDetailsBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
}
+ functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.SOURCE);
+
return functionDetailsBuilder.build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 7065a49..d9804e5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -66,11 +66,11 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -113,9 +113,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
@@ -126,9 +123,9 @@ public abstract class ComponentImpl {
private final AtomicReference<StorageClient> storageClient = new AtomicReference<>();
protected final Supplier<WorkerService> workerServiceSupplier;
- protected final ComponentType componentType;
+ protected final Function.FunctionDetails.ComponentType componentType;
- public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, ComponentType componentType) {
+ public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, Function.FunctionDetails.ComponentType componentType) {
this.workerServiceSupplier = workerServiceSupplier;
this.componentType = componentType;
}
@@ -307,13 +304,13 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
}
if (componentName == null) {
- throw new RestException(Status.BAD_REQUEST, componentType + " Name is not provided");
+ throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
}
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -337,7 +334,7 @@ public abstract class ComponentImpl {
}
} catch (PulsarAdminException.NotAuthorizedException e) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, componentName, tenant);
@@ -350,8 +347,8 @@ public abstract class ComponentImpl {
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", componentType, componentName));
+ log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionDetails functionDetails = null;
@@ -369,7 +366,7 @@ public abstract class ComponentImpl {
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
} catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), componentType, functionPkgUrl));
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
}
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
componentConfigJson, componentType, componentPackageFile);
@@ -380,19 +377,19 @@ public abstract class ComponentImpl {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
componentConfigJson, componentType, componentPackageFile);
if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
- throw new IllegalArgumentException(componentType + " Package is not provided");
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
}
}
} catch (Exception e) {
- log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
- log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
+ log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
}
// function state
@@ -418,10 +415,10 @@ public abstract class ComponentImpl {
.build());
}
} catch (Exception e) {
- log.error("Error caching authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
}
}
}
@@ -431,7 +428,7 @@ public abstract class ComponentImpl {
packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
functionPkgUrl, fileDetail, componentPackageFile);
} catch (Exception e) {
- log.error("Failed process {} {}/{}/{} package: ", componentType, tenant, namespace, componentName, e);
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
@@ -461,7 +458,7 @@ public abstract class ComponentImpl {
// For externally managed schedulers, the pkgUrl/builtin stuff should be copied to bk
if (isBuiltin) {
File sinkOrSource;
- if (componentType.equals(SOURCE)) {
+ if (componentType == FunctionDetails.ComponentType.SOURCE) {
String archiveName = functionDetails.getSource().getBuiltin();
sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
} else {
@@ -471,13 +468,13 @@ public abstract class ComponentImpl {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
sinkOrSource.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
- log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+ log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
} else if (isPkgUrlProvided) {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
uploadedInputStreamAsFile.getName()));
packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
- log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+ log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
} else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)
|| functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
@@ -485,13 +482,13 @@ public abstract class ComponentImpl {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileName));
packageLocationMetaDataBuilder.setOriginalFileName(fileName);
- log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+ log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
fileDetail.getFileName()));
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
- log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+ log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
}
} else {
@@ -506,7 +503,7 @@ public abstract class ComponentImpl {
} else {
packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
- log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+ log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
}
}
@@ -536,13 +533,13 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
}
if (componentName == null) {
- throw new RestException(Status.BAD_REQUEST, componentType + " Name is not provided");
+ throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
}
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
@@ -554,7 +551,7 @@ public abstract class ComponentImpl {
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", componentType, componentName));
+ throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
String mergedComponentConfigJson;
@@ -563,11 +560,11 @@ public abstract class ComponentImpl {
FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
- if (componentType.equals(FUNCTION)) {
+ if (componentType.equals(FunctionDetails.ComponentType.FUNCTION)) {
FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
@@ -582,7 +579,7 @@ public abstract class ComponentImpl {
} catch (Exception e) {
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
- } else if (componentType.equals(SOURCE)) {
+ } else if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
@@ -627,7 +624,7 @@ public abstract class ComponentImpl {
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
} catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), componentType, functionPkgUrl));
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
}
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
mergedComponentConfigJson, componentType, componentPackageFile);
@@ -637,7 +634,7 @@ public abstract class ComponentImpl {
try {
componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
} catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), componentType, functionPkgUrl));
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
}
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
mergedComponentConfigJson, componentType, componentPackageFile);
@@ -651,7 +648,7 @@ public abstract class ComponentImpl {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
mergedComponentConfigJson, componentType, componentPackageFile);
if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
- throw new IllegalArgumentException(componentType + " Package is not provided");
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
}
} else {
@@ -664,16 +661,16 @@ public abstract class ComponentImpl {
mergedComponentConfigJson, componentType, componentPackageFile);
}
} catch (Exception e) {
- log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
try {
worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
} catch (Exception e) {
- log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName);
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
- componentType, componentName, e.getMessage()));
+ ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
}
// merge from existing metadata
@@ -707,8 +704,8 @@ public abstract class ComponentImpl {
functionMetaDataBuilder.clearFunctionAuthSpec();
}
} catch (Exception e) {
- log.error("Error updating authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
+ log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
}
}
}
@@ -719,7 +716,7 @@ public abstract class ComponentImpl {
packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
functionPkgUrl, fileDetail, componentPackageFile);
} catch (Exception e) {
- log.error("Failed process {} {}/{}/{} package: ", componentType, tenant, namespace, componentName, e);
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
} else {
@@ -750,7 +747,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -776,20 +773,20 @@ public abstract class ComponentImpl {
try {
validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid deregister {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} to deregister does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
@@ -803,11 +800,11 @@ public abstract class ComponentImpl {
}
} catch (ExecutionException e) {
log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
- componentType, tenant, namespace, componentName, e);
+ ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
} catch (InterruptedException e) {
log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
- componentType, tenant, namespace, componentName, e);
+ ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
}
}
@@ -825,7 +822,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -837,19 +834,19 @@ public abstract class ComponentImpl {
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+ log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
@@ -891,7 +888,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -903,20 +900,20 @@ public abstract class ComponentImpl {
try {
validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
} catch (IllegalArgumentException e) {
- log.error("Invalid start/stop {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid start/stop {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
@@ -930,7 +927,7 @@ public abstract class ComponentImpl {
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("Failed to start/stop {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
+ log.error("Failed to start/stop {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, instanceId, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
@@ -949,7 +946,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -961,20 +958,20 @@ public abstract class ComponentImpl {
try {
validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
} catch (IllegalArgumentException e) {
- log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid restart {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -984,7 +981,7 @@ public abstract class ComponentImpl {
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
+ log.error("Failed to restart {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, instanceId, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
@@ -1019,7 +1016,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1031,20 +1028,20 @@ public abstract class ComponentImpl {
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid start/stop {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid start/stop {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
@@ -1057,7 +1054,7 @@ public abstract class ComponentImpl {
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("Failed to start/stop {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Failed to start/stop {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
@@ -1074,7 +1071,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1086,20 +1083,20 @@ public abstract class ComponentImpl {
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid restart {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -1108,7 +1105,7 @@ public abstract class ComponentImpl {
} catch (WebApplicationException we) {
throw we;
} catch (Exception e) {
- log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Failed to restart {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
@@ -1126,7 +1123,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1138,20 +1135,20 @@ public abstract class ComponentImpl {
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid get {} Stats request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} in get {} Stats does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), componentType, tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -1182,7 +1179,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1194,26 +1191,26 @@ public abstract class ComponentImpl {
try {
validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
} catch (IllegalArgumentException e) {
- log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid get {} Stats request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} in get {} Stats does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), componentType, tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
int instanceIdInt = Integer.parseInt(instanceId);
if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
- log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
+ log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(componentType), componentName, instanceId));
}
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -1242,7 +1239,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
- log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, componentType);
+ log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1254,7 +1251,7 @@ public abstract class ComponentImpl {
try {
validateListFunctionRequestParams(tenant, namespace);
} catch (IllegalArgumentException e) {
- log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e);
+ log.error("Invalid list {} request @ /{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
@@ -1315,7 +1312,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to trigger {}", tenant, namespace,
- functionName, clientRole, componentType);
+ functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1430,7 +1427,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to get state for {}", tenant, namespace,
- functionName, clientRole, componentType);
+ functionName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1549,7 +1546,7 @@ public abstract class ComponentImpl {
protected void validateGetFunctionInstanceRequestParams(final String tenant,
final String namespace,
final String componentName,
- final ComponentType componentType,
+ final FunctionDetails.ComponentType componentType,
final String instanceId) throws IllegalArgumentException {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
if (instanceId == null) {
@@ -1557,7 +1554,7 @@ public abstract class ComponentImpl {
}
}
- protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, ComponentType componentType)
+ protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, FunctionDetails.ComponentType componentType)
throws IllegalArgumentException {
if (tenant == null) {
@@ -1567,11 +1564,11 @@ public abstract class ComponentImpl {
throw new IllegalArgumentException("Namespace is not provided");
}
if (subject == null) {
- throw new IllegalArgumentException(componentType + " Name is not provided");
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Name is not provided");
}
}
- private void validateDeregisterRequestParams(String tenant, String namespace, String subject, ComponentType componentType)
+ private void validateDeregisterRequestParams(String tenant, String namespace, String subject, FunctionDetails.ComponentType componentType)
throws IllegalArgumentException {
if (tenant == null) {
@@ -1581,7 +1578,7 @@ public abstract class ComponentImpl {
throw new IllegalArgumentException("Namespace is not provided");
}
if (subject == null) {
- throw new IllegalArgumentException(componentType + " Name is not provided");
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Name is not provided");
}
}
@@ -1627,7 +1624,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
final String namespace,
final String componentName,
final String componentConfigJson,
- final ComponentType componentType,
+ final FunctionDetails.ComponentType componentType,
final File componentPackageFile) throws IOException {
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
@@ -1636,10 +1633,10 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
throw new IllegalArgumentException("Namespace is not provided");
}
if (componentName == null) {
- throw new IllegalArgumentException(String.format("%s Name is not provided", componentType));
+ throw new IllegalArgumentException(String.format("%s Name is not provided", ComponentTypeUtils.toString(componentType)));
}
- if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
+ if (componentType.equals(FunctionDetails.ComponentType.FUNCTION) && !isEmpty(componentConfigJson)) {
FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
// The rest end points take precedence over whatever is there in functionconfig
functionConfig.setTenant(tenant);
@@ -1649,7 +1646,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
return FunctionConfigUtils.convert(functionConfig, clsLoader);
}
- if (componentType.equals(SOURCE)) {
+ if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
Path archivePath = null;
SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
// The rest end points take precedence over whatever is there in sourceconfig
@@ -1671,7 +1668,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, componentPackageFile);
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
}
- if (componentType.equals(SINK)) {
+ if (componentType.equals(FunctionDetails.ComponentType.SINK)) {
Path archivePath = null;
SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
// The rest end points take precedence over whatever is there in sinkConfig
@@ -1693,7 +1690,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
} else {
- throw new IllegalArgumentException("Unrecognized component type: " + componentType);
+ throw new IllegalArgumentException("Unrecognized component type: " + ComponentTypeUtils.toString(componentType));
}
}
@@ -1769,7 +1766,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
try {
if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized get status for {}", tenant, namespace,
- componentName, clientRole, componentType);
+ componentName, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -1781,20 +1778,20 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid get {} Status request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{} in get {} Status does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), componentType, tenant, namespace, componentName);
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
}
@@ -1810,9 +1807,9 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
int parallelism = functionMetaData.getFunctionDetails().getParallelism();
if (instanceId < 0 || instanceId >= parallelism) {
- log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+ log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
throw new RestException(Status.BAD_REQUEST,
- String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
+ String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(componentType), componentName, instanceId));
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 93642f6..bf09fbf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -25,7 +25,6 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
@@ -197,7 +196,7 @@ public class FunctionsImpl extends ComponentImpl {
}
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.FUNCTION);
+ super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
}
/**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index c0cac01..e7b9ff1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -207,7 +207,7 @@ public class SinkImpl extends ComponentImpl {
}
public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SINK);
+ super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
}
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
@@ -270,19 +270,19 @@ public class SinkImpl extends ComponentImpl {
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+ log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 43bb4e7..a2b59ef 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -209,7 +209,7 @@ public class SourceImpl extends ComponentImpl {
}
public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, ComponentType.SOURCE);
+ super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
}
public SourceStatus getSourceStatus(final String tenant,
@@ -268,19 +268,19 @@ public class SourceImpl extends ComponentImpl {
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
} catch (IllegalArgumentException e) {
- log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+ log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
- throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+ log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+ throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
- throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+ log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
}
SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 78248e0..dcfa8f1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
@@ -62,9 +61,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
@@ -192,7 +189,7 @@ public class FunctionsImplTest {
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.FUNCTION); }
+ PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(Function.FunctionDetails.ComponentType.FUNCTION); }
@Test
public void testStatusEmpty() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 1790fe7..17b0234 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -183,7 +182,7 @@ public class FunctionApiV2ResourceTest {
FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.FUNCTION);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
this.resource = spy(new FunctionsImplV2(functions));
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index ffbf552..696bd19 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -43,7 +43,6 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -78,9 +77,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -88,7 +84,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
@@ -183,7 +178,7 @@ public class FunctionApiV3ResourceTest {
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.FUNCTION);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
}
//
@@ -1406,9 +1401,9 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(ComponentType.SOURCE);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(ComponentType.FUNCTION);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(ComponentType.SINK);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SOURCE);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SINK);
List<String> functionList = listDefaultFunctions();
assertEquals(functions, functionList);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 827cabf..59ee08b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -178,7 +177,7 @@ public class SinkApiV3ResourceTest {
this.resource = spy(new SinkImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.SINK);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SINK);
}
//
@@ -1336,9 +1335,9 @@ public class SinkApiV3ResourceTest {
when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(ComponentType.SOURCE);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(ComponentType.FUNCTION);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(ComponentType.SINK);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SOURCE);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SINK);
List<String> sinkList = listDefaultSinks();
assertEquals(functions, sinkList);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 93a3cea..c56066b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -42,7 +42,6 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -173,7 +172,7 @@ public class SourceApiV3ResourceTest {
this.resource = spy(new SourceImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.SOURCE);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SOURCE);
}
//
@@ -1345,9 +1344,9 @@ public class SourceApiV3ResourceTest {
functionMetaDataList.add(f3);
when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(ComponentType.SOURCE);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(ComponentType.FUNCTION);
- PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(ComponentType.SINK);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SOURCE);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
+ PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SINK);
List<String> sourceList = listDefaultSources();
assertEquals(functions, sourceList);