You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/06 17:27:56 UTC
[pulsar] branch master updated: Removed client-admin dependency
from function-utils (#2739)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f8e7dfe Removed client-admin dependency from function-utils (#2739)
f8e7dfe is described below
commit f8e7dfe1aa4f93c1fb724d705b10f89264583c27
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Sat Oct 6 10:27:52 2018 -0700
Removed client-admin dependency from function-utils (#2739)
---
.../apache/pulsar/functions/instance/Utils.java | 57 ++++++++++++++++++++++
.../pulsar/functions/source/PulsarRecord.java | 2 +-
.../pulsar/functions/instance}/UtilsTest.java | 2 +-
pulsar-functions/utils/pom.xml | 14 ++++--
.../pulsar/functions/utils/FunctionConfig.java | 2 -
.../functions/utils/FunctionConfigUtils.java | 1 -
.../apache/pulsar/functions/utils/Resources.java | 3 --
.../org/apache/pulsar/functions/utils/Utils.java | 25 ----------
.../validation/ConfigValidationAnnotations.java | 2 -
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 7 ++-
10 files changed, 72 insertions(+), 43 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
new file mode 100644
index 0000000..7149bfe
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Utils used for instance.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class Utils {
+
+ public static final long getSequenceId(MessageId messageId) {
+ MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
+ ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
+ : messageId);
+ long ledgerId = msgId.getLedgerId();
+ long entryId = msgId.getEntryId();
+
+ // Combine ledger id and entry id to form offset
+ // Use less than 32 bits to represent entry id since it will get
+ // rolled over way before overflowing the max int range
+ long offset = (ledgerId << 28) | entryId;
+ return offset;
+ }
+
+ public static final MessageId getMessageId(long sequenceId) {
+ // Demultiplex ledgerId and entryId from offset
+ long ledgerId = sequenceId >>> 28;
+ long entryId = sequenceId & 0x0F_FF_FF_FFL;
+
+ return new MessageIdImpl(ledgerId, entryId, -1);
+ }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 359f48e..dc5a08a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -29,7 +29,7 @@ import lombok.ToString;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.instance.Utils;
@Builder
@Getter
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
similarity index 97%
rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
index 511270a..ba83c89 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.utils;
+package org.apache.pulsar.functions.instance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 0d7f941..6c6930f 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -34,14 +34,20 @@
<dependencies>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-admin-original</artifactId>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index d2b94e3..2e43edc 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,13 +29,11 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
-import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index d546830..cf182a8 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -27,7 +27,6 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
index 7e8a127..5c707fa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Resources.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.functions.utils;
import lombok.*;
-import java.util.HashMap;
-import java.util.Map;
-
@Getter
@Setter
@Data
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 7befc85..d35be61 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -29,9 +29,6 @@ import java.lang.reflect.Type;
import java.net.ServerSocket;
import java.util.Collection;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
import org.apache.pulsar.io.core.Sink;
@@ -57,28 +54,6 @@ public class Utils {
public static String FILE = "file";
public static String BUILTIN = "builtin";
- public static final long getSequenceId(MessageId messageId) {
- MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
- ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
- : messageId);
- long ledgerId = msgId.getLedgerId();
- long entryId = msgId.getEntryId();
-
- // Combine ledger id and entry id to form offset
- // Use less than 32 bits to represent entry id since it will get
- // rolled over way before overflowing the max int range
- long offset = (ledgerId << 28) | entryId;
- return offset;
- }
-
- public static final MessageId getMessageId(long sequenceId) {
- // Demultiplex ledgerId and entryId from offset
- long ledgerId = sequenceId >>> 28;
- long entryId = sequenceId & 0x0F_FF_FF_FFL;
-
- return new MessageIdImpl(ledgerId, entryId, -1);
- }
-
public static String printJson(MessageOrBuilder msg) throws IOException {
return JsonFormat.printer().print(msg);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
index 08f0d66..d562404 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ConfigValidationAnnotations.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.functions.utils.validation;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 91d043d..a4c589f 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -19,8 +19,6 @@
package org.apache.pulsar.io.jdbc;
-import static jersey.repackaged.com.google.common.base.Preconditions.checkState;
-
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -139,8 +137,9 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
- checkState(swapList.isEmpty(),
- "swapList should be empty since last flush. swapList.size: " + swapList.size());
+ if (!swapList.isEmpty()) {
+ throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
+ }
synchronized (incomingList) {
List<Record<T>> tmpList;