You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/06 17:27:56 UTC

[pulsar] branch master updated: Removed client-admin dependency from function-utils (#2739)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f8e7dfe  Removed client-admin dependency from function-utils (#2739)
f8e7dfe is described below

commit f8e7dfe1aa4f93c1fb724d705b10f89264583c27
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sat Oct 6 10:27:52 2018 -0700

    Removed client-admin dependency from function-utils (#2739)
---
 .../apache/pulsar/functions/instance/Utils.java    | 57 ++++++++++++++++++++++
 .../pulsar/functions/source/PulsarRecord.java      |  2 +-
 .../pulsar/functions/instance}/UtilsTest.java      |  2 +-
 pulsar-functions/utils/pom.xml                     | 14 ++++--
 .../pulsar/functions/utils/FunctionConfig.java     |  2 -
 .../functions/utils/FunctionConfigUtils.java       |  1 -
 .../apache/pulsar/functions/utils/Resources.java   |  3 --
 .../org/apache/pulsar/functions/utils/Utils.java   | 25 ----------
 .../validation/ConfigValidationAnnotations.java    |  2 -
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    |  7 ++-
 10 files changed, 72 insertions(+), 43 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
new file mode 100644
index 0000000..7149bfe
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Utils used for instance.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class Utils {
+
+    public static final long getSequenceId(MessageId messageId) {
+        MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
+                ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
+                : messageId);
+        long ledgerId = msgId.getLedgerId();
+        long entryId = msgId.getEntryId();
+
+        // Combine ledger id and entry id to form offset
+        // Use less than 32 bits to represent entry id since it will get
+        // rolled over way before overflowing the max int range
+        long offset = (ledgerId << 28) | entryId;
+        return offset;
+    }
+
+    public static final MessageId getMessageId(long sequenceId) {
+        // Demultiplex ledgerId and entryId from offset
+        long ledgerId = sequenceId >>> 28;
+        long entryId = sequenceId & 0x0F_FF_FF_FFL;
+
+        return new MessageIdImpl(ledgerId, entryId, -1);
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 359f48e..dc5a08a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -29,7 +29,7 @@ import lombok.ToString;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.instance.Utils;
 
 @Builder
 @Getter
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
similarity index 97%
rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
index 511270a..ba83c89 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.utils;
+package org.apache.pulsar.functions.instance;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 0d7f941..6c6930f 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -34,14 +34,20 @@
   <dependencies>
 
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-admin-original</artifactId>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-common</artifactId>
       <version>${project.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index d2b94e3..2e43edc 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,13 +29,11 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
 import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index d546830..cf182a8 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
index 7e8a127..5c707fa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.functions.utils;
 
 import lombok.*;
 
-import java.util.HashMap;
-import java.util.Map;
-
 @Getter
 @Setter
 @Data
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 7befc85..d35be61 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -29,9 +29,6 @@ import java.lang.reflect.Type;
 import java.net.ServerSocket;
 import java.util.Collection;
 
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.io.core.Sink;
@@ -57,28 +54,6 @@ public class Utils {
     public static String FILE = "file";
     public static String BUILTIN = "builtin";
 
-    public static final long getSequenceId(MessageId messageId) {
-        MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
-                ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
-                : messageId);
-        long ledgerId = msgId.getLedgerId();
-        long entryId = msgId.getEntryId();
-
-        // Combine ledger id and entry id to form offset
-        // Use less than 32 bits to represent entry id since it will get
-        // rolled over way before overflowing the max int range
-        long offset = (ledgerId << 28) | entryId;
-        return offset;
-    }
-
-    public static final MessageId getMessageId(long sequenceId) {
-        // Demultiplex ledgerId and entryId from offset
-        long ledgerId = sequenceId >>> 28;
-        long entryId = sequenceId & 0x0F_FF_FF_FFL;
-
-        return new MessageIdImpl(ledgerId, entryId, -1);
-    }
-
     public static String printJson(MessageOrBuilder msg) throws IOException {
         return JsonFormat.printer().print(msg);
     }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index 08f0d66..d562404 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.functions.utils.validation;
 
-import org.apache.pulsar.functions.utils.FunctionConfig;
-
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Repeatable;
 import java.lang.annotation.Retention;
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 91d043d..a4c589f 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -19,8 +19,6 @@
 
 package org.apache.pulsar.io.jdbc;
 
-import static jersey.repackaged.com.google.common.base.Preconditions.checkState;
-
 import com.google.common.collect.Lists;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -139,8 +137,9 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
             if (log.isDebugEnabled()) {
                 log.debug("Starting flush, queue size: {}", incomingList.size());
             }
-            checkState(swapList.isEmpty(),
-                "swapList should be empty since last flush. swapList.size: " + swapList.size());
+            if (!swapList.isEmpty()) {
+                throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
+            }
 
             synchronized (incomingList) {
                 List<Record<T>> tmpList;