You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:01:58 UTC
[incubator-pulsar] branch branch-2.1 updated: Downgrading ZK to
stable version 3.4.13 (#2473)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6a02c94 Downgrading ZK to stable version 3.4.13 (#2473)
6a02c94 is described below
commit 6a02c9434fb3dbb821d0ce6766f0ba49f00565e2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 6 10:58:17 2018 -0700
Downgrading ZK to stable version 3.4.13 (#2473)
* Downgrading ZK to stable version 3.4.13
* Added integration test for ZK server downgrade
* Fixed ZookeeperClientFactoryImplTest.testZKCreationFailure
* Fixed dependencies version in license file
* Addressed comments
* Fixed jline version in license file
* There are 2 jline jars to count
---
distribution/server/licenses/LICENSE-JLine.txt | 29 +++++
distribution/server/src/assemble/LICENSE.bin.txt | 43 +++----
pom.xml | 8 +-
pulsar-zookeeper-utils/pom.xml | 57 +++++++++
.../pulsar/zookeeper/FileTxnSnapLogWrapper.java | 60 ----------
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 2 +-
.../pulsar/zookeeper/SerializeUtilsAspect.java | 130 +++++++++++++++++++++
.../zookeeper/ZookeeperBkClientFactoryImpl.java | 7 +-
.../zookeeper/ZookeeperClientFactoryImplTest.java | 12 +-
.../pulsar/zookeeper/ZooKeeperServerAspect.java | 16 ---
.../pulsar/tests/topologies/PulsarCluster.java | 13 +++
.../pulsar/tests/topologies/PulsarClusterSpec.java | 9 +-
.../tests/upgrade/PulsarZKDowngradeTest.java | 115 ++++++++++++++++++
.../src/main/resources/zk-3.5-test-data/log.1 | Bin 0 -> 20480 bytes
.../src/main/resources/zk-3.5-test-data/log.85 | Bin 0 -> 1024 bytes
.../src/main/resources/zk-3.5-test-data/snapshot.0 | Bin 0 -> 424 bytes
.../main/resources/zk-3.5-test-data/snapshot.84 | Bin 0 -> 10747 bytes
17 files changed, 394 insertions(+), 107 deletions(-)
diff --git a/distribution/server/licenses/LICENSE-JLine.txt b/distribution/server/licenses/LICENSE-JLine.txt
new file mode 100644
index 0000000..9a34d43
--- /dev/null
+++ b/distribution/server/licenses/LICENSE-JLine.txt
@@ -0,0 +1,29 @@
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index d813041..bef4498 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -370,27 +370,27 @@ The Apache Software License, Version 2.0
- org.apache.logging.log4j-log4j-web-2.10.0.jar
* Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
* BookKeeper
- - org.apache.bookkeeper-bookkeeper-common-4.7.1.jar
- - org.apache.bookkeeper-bookkeeper-proto-4.7.1.jar
- - org.apache.bookkeeper-bookkeeper-server-4.7.1.jar
- - org.apache.bookkeeper-circe-checksum-4.7.1.jar
- - org.apache.bookkeeper-statelib-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-api-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-common-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-java-client-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-java-client-base-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-proto-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-server-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-service-api-4.7.1.jar
- - org.apache.bookkeeper-stream-storage-service-impl-4.7.1.jar
- - org.apache.bookkeeper.http-http-server-4.7.1.jar
- - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.1.jar
- - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.1.jar
- - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.1.jar
- - org.apache.distributedlog-distributedlog-common-4.7.1.jar
- - org.apache.distributedlog-distributedlog-core-4.7.1-tests.jar
- - org.apache.distributedlog-distributedlog-core-4.7.1.jar
- - org.apache.distributedlog-distributedlog-protocol-4.7.1.jar
+ - org.apache.bookkeeper-bookkeeper-common-4.7.2.jar
+ - org.apache.bookkeeper-bookkeeper-proto-4.7.2.jar
+ - org.apache.bookkeeper-bookkeeper-server-4.7.2.jar
+ - org.apache.bookkeeper-circe-checksum-4.7.2.jar
+ - org.apache.bookkeeper-statelib-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-api-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-common-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-java-client-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-java-client-base-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-proto-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-server-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-service-api-4.7.2.jar
+ - org.apache.bookkeeper-stream-storage-service-impl-4.7.2.jar
+ - org.apache.bookkeeper.http-http-server-4.7.2.jar
+ - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.2.jar
+ - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.2.jar
+ - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.2.jar
+ - org.apache.distributedlog-distributedlog-common-4.7.2.jar
+ - org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
+ - org.apache.distributedlog-distributedlog-core-4.7.2.jar
+ - org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
* LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
@@ -467,6 +467,7 @@ BSD 3-clause "New" or "Revised" License
- com.ea.agentloader-ea-agent-loader-1.0.2.jar -- licenses/LICENSE-EA-Agent-Loader.txt
* Google auth library
- com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt
+ * JLine -- jline-jline-0.9.94.jar -- licenses/LICENSE.JLine.txt
* LevelDB -- (included in org.rocksdb.*.jar) -- licenses/LICENSE-LevelDB.txt
* JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- licenses/LICENSE-JSR305.txt
diff --git a/pom.xml b/pom.xml
index e04cce4..6e26511 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,8 +128,8 @@ flexible messaging model and an intuitive client API.</description>
<!-- apache commons -->
<commons-compress.version>1.15</commons-compress.version>
- <bookkeeper.version>4.7.1</bookkeeper.version>
- <zookeeper.version>3.5.4-beta</zookeeper.version>
+ <bookkeeper.version>4.7.2</bookkeeper.version>
+ <zookeeper.version>3.4.13</zookeeper.version>
<netty.version>4.1.22.Final</netty.version>
<storm.version>1.0.5</storm.version>
<jetty.version>9.3.11.v20160721</jetty.version>
@@ -1006,6 +1006,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>generated-site/**</exclude>
<exclude>.github/*.md</exclude>
<exclude>**/.idea/*</exclude>
+ <exclude>**/zk-3.5-test-data/*</exclude>
</excludes>
<mapping>
<proto>JAVADOC_STYLE</proto>
@@ -1108,6 +1109,9 @@ flexible messaging model and an intuitive client API.</description>
<exclude>certificate-authority/index.txt</exclude>
<exclude>certificate-authority/README.md</exclude>
+ <!-- Exclude ZK test data file -->
+ <exclude>**/zk-3.5-test-data/*</exclude>
+
<!-- Python requirements files -->
<exclude>**/requirements.txt</exclude>
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 4234feb..6f496b4 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -73,6 +73,17 @@
</dependency>
<dependency>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjrt</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjweaver</artifactId>
+ </dependency>
+
+
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.parent.version}</version>
@@ -102,7 +113,53 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>aspectj-maven-plugin</artifactId>
+ <configuration>
+ <complianceLevel>1.8</complianceLevel>
+ <source>1.8</source>
+ <target>1.8</target>
+ <showWeaveInfo>true</showWeaveInfo>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>aspectj-maven-plugin</artifactId>
+ <versionRange>[1.10,)</versionRange>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java
deleted file mode 100644
index 45b9b78..0000000
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.zookeeper.server.DataTree;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-@Slf4j
-public class FileTxnSnapLogWrapper extends FileTxnSnapLog {
-
- public FileTxnSnapLogWrapper(FileTxnSnapLog src) throws IOException {
- this(src.getDataDir(), src.getSnapDir());
- }
-
- public FileTxnSnapLogWrapper(File dataDir, File snapDir) throws IOException {
- super(dataDir, snapDir);
- }
-
- @Override
- public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
- try {
- return super.restore(dt, sessions, listener);
- } catch (IOException e) {
- if ("No snapshot found, but there are log entries. Something is broken!".equals(e.getMessage())) {
- log.info("Ignoring exception for missing ZK db");
- // Ignore error when snapshot is not found. This is needed when upgrading ZK from 3.4 to 3.5
- // https://issues.apache.org/jira/browse/ZOOKEEPER-3056
- save(dt, (ConcurrentHashMap<Long, Integer>) sessions);
-
- /* return a zxid of zero, since we the database is empty */
- return 0;
- } else {
- throw e;
- }
- }
- }
-}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index f6ab92c..f7923e4 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -154,7 +154,7 @@ public class LocalBookkeeperEnsemble {
try {
// Allow all commands on ZK control port
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
- zks = new ZooKeeperServer(new FileTxnSnapLogWrapper(zkDataDir, zkDataDir));
+ zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), maxCC);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java
new file mode 100644
index 0000000..e7eb3cc
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.CreateTxnV0;
+import org.apache.zookeeper.txn.DeleteTxn;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.MultiTxn;
+import org.apache.zookeeper.txn.SetACLTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+
+@Aspect
+public class SerializeUtilsAspect {
+ @Around("execution(* org.apache.zookeeper.server.util.SerializeUtils.deserializeTxn(..))")
+ public Record wrapper(ProceedingJoinPoint joinPoint) throws IOException {
+ byte[] txnBytes = (byte[]) joinPoint.getArgs()[0];
+ TxnHeader hdr = (TxnHeader) joinPoint.getArgs()[1];
+
+ return deserializeTxn(txnBytes, hdr);
+ }
+
+ private static final int CREATE2 = 15;
+ private static final int CREATE_CONTAINER = 19;
+ private static final int DELETE_CONTAINER = 20;
+
+ /**
+ * Copied `deserializeTxn()` from ZK-3.4.13
+ *
+ * https://github.com/apache/zookeeper/blob/release-3.4.13/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java#L54
+ *
+ * With addition for handling `create2`, `createContainer` and `deleteContainer` transactions when downgrading from
+ * 3.5.x to 3.4.x
+ */
+ public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr)
+ throws IOException {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);
+ InputArchive ia = BinaryInputArchive.getArchive(bais);
+
+ hdr.deserialize(ia, "hdr");
+ bais.mark(bais.available());
+ Record txn = null;
+ switch (hdr.getType()) {
+ case OpCode.createSession:
+ // This isn't really an error txn; it just has the same
+ // format. The error represents the timeout
+ txn = new CreateSessionTxn();
+ break;
+ case OpCode.closeSession:
+ return null;
+ case OpCode.create:
+ case CREATE2: // create2 is treated as a create op in 3.5.x
+ case CREATE_CONTAINER: // createContainer can be treated as a regular create operation, since ZK 3.4 doens't
+ // have support for auto cleaning the empty directories
+ txn = new CreateTxn();
+ break;
+ case OpCode.delete:
+ case DELETE_CONTAINER: // deleteContainer is treaded as regular delete in 3.5.x
+ txn = new DeleteTxn();
+ break;
+ case OpCode.setData:
+ txn = new SetDataTxn();
+ break;
+ case OpCode.setACL:
+ txn = new SetACLTxn();
+ break;
+ case OpCode.error:
+ txn = new ErrorTxn();
+ break;
+ case OpCode.multi:
+ txn = new MultiTxn();
+ break;
+ default:
+ throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
+ }
+ if (txn != null) {
+ try {
+ txn.deserialize(ia, "txn");
+ } catch(EOFException e) {
+ // perhaps this is a V0 Create
+ if (hdr.getType() == OpCode.create) {
+ CreateTxn create = (CreateTxn)txn;
+ bais.reset();
+ CreateTxnV0 createv0 = new CreateTxnV0();
+ createv0.deserialize(ia, "txn");
+ // cool now make it V1. a -1 parentCVersion will
+ // trigger fixup processing in processTxn
+ create.setPath(createv0.getPath());
+ create.setData(createv0.getData());
+ create.setAcl(createv0.getAcl());
+ create.setEphemeral(createv0.getEphemeral());
+ create.setParentCVersion(-1);
+ } else {
+ throw e;
+ }
+ }
+ }
+ return txn;
+ }
+
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index d18aea9..aac96ce 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,18 +23,19 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-@Slf4j
public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
+ private static final Logger log = LoggerFactory.getLogger(ZookeeperBkClientFactoryImpl.class);
+
private final OrderedExecutor executor;
public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
index 74f3970..7331471 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.zookeeper;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.test.PortManager;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
@@ -79,6 +79,12 @@ public class ZookeeperClientFactoryImplTest {
ZooKeeperClientFactory zkf = new ZookeeperClientFactoryImpl();
CompletableFuture<ZooKeeper> zkFuture = zkf.create("invalid", SessionType.ReadWrite,
(int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
- assertTrue(zkFuture.isCompletedExceptionally());
+
+ try {
+ zkFuture.get(3, TimeUnit.SECONDS);
+ fail("Should have thrown exception");
+ } catch (TimeoutException e) {
+ // Expected
+ }
}
}
diff --git a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
index c7642f3..24919be 100644
--- a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
+++ b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
@@ -20,14 +20,9 @@ package org.apache.pulsar.zookeeper;
import io.prometheus.client.Gauge;
-import java.util.Arrays;
-
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.aspectj.lang.JoinPoint;
-import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
-import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
@@ -42,17 +37,6 @@ public class ZooKeeperServerAspect {
public void zkServerConstructorPointCut() {
}
- @Around("zkServerConstructorPointCut()")
- public void zkServerConstructorBefore(ProceedingJoinPoint joinPoint) throws Throwable {
- Object[] args = joinPoint.getArgs();
- if (args[0] instanceof FileTxnSnapLog) {
- // Wrap FileTxnSnapLog argument
- args[0] = new FileTxnSnapLogWrapper((FileTxnSnapLog)args[0]);
- }
-
- joinPoint.proceed(args);
- }
-
@After("zkServerConstructorPointCut()")
public void zkServerConstructor(JoinPoint joinPoint) throws Throwable {
// ZooKeeperServer instance was created
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 331d7e2..f2151ef 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -36,6 +36,7 @@ import java.util.stream.Stream;
import com.google.common.collect.Streams;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.tests.containers.BKContainer;
import org.apache.pulsar.tests.containers.BrokerContainer;
import org.apache.pulsar.tests.containers.CSContainer;
@@ -44,6 +45,9 @@ import org.apache.pulsar.tests.containers.PulsarContainer;
import org.apache.pulsar.tests.containers.WorkerContainer;
import org.apache.pulsar.tests.containers.ZKContainer;
import org.testcontainers.containers.Container.ExecResult;
+
+import org.testcontainers.containers.BindMode;
+
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -134,6 +138,15 @@ public class PulsarCluster {
)
);
+ spec.classPathVolumeMounts.entrySet().forEach(e -> {
+ zkContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE);
+ proxyContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE);
+
+ bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+ brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+ workerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+ });
+
}
public String getPlainTextServiceUrl() {
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
index ceb30b5..96133b3 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.tests.topologies;
import java.util.Collections;
import java.util.Map;
+import java.util.TreeMap;
+
import lombok.Builder;
import lombok.Builder.Default;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
-import org.apache.pulsar.tests.containers.ChaosContainer;
+
import org.testcontainers.containers.GenericContainer;
/**
@@ -100,4 +102,9 @@ public class PulsarClusterSpec {
@Default
boolean enableContainerLog = false;
+ /**
+ * Provide a map of paths (in the classpath) to mount as volumes inside the containers
+ */
+ @Builder.Default
+ Map<String, String> classPathVolumeMounts = new TreeMap<>();
}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/upgrade/PulsarZKDowngradeTest.java b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/upgrade/PulsarZKDowngradeTest.java
new file mode 100644
index 0000000..6005aa7
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/upgrade/PulsarZKDowngradeTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.upgrade;
+
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.stream.Stream;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+/**
+ * Test downgrading ZK from 3.5.x to 3.4.x. This is part of the upgrade from Pulsar 2.1.0 to 2.1.1.
+ */
+@Slf4j
+public class PulsarZKDowngradeTest extends PulsarClusterTestBase {
+
+ protected static final int ENTRIES_PER_LEDGER = 1024;
+
+ @BeforeSuite
+ @Override
+ public void setupCluster() throws Exception {
+ final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
+
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(1)
+ .clusterName(clusterName)
+ .classPathVolumeMounts(
+ ImmutableMap.<String, String> builder()
+ .put("zk-3.5-test-data", "/pulsar/data/zookeeper/version-2/version-2")
+ .build())
+ .build();
+
+ log.info("Setting up cluster {} with {} bookies, {} brokers",
+ spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+ pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+
+ log.info("Cluster {} is setup", spec.clusterName());
+ }
+
+ @AfterSuite
+ @Override
+ public void tearDownCluster() {
+ super.tearDownCluster();
+ }
+
+ @Test(dataProvider = "ServiceUrlAndTopics")
+ public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
+ String topicName = generateTopicName("testpubconsume", isPersistent);
+
+ int numMessages = 10;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build();
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("smoke-message-" + i);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> m = consumer.receive();
+ assertEquals("smoke-message-" + i, m.getValue());
+ }
+
+ }
+}
diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.1 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.1
new file mode 100644
index 0000000..6d5ae52
Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.1 differ
diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.85 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.85
new file mode 100644
index 0000000..4ecb63b
Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/log.85 differ
diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.0 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.0
new file mode 100644
index 0000000..3e6deee
Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.0 differ
diff --git a/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.84 b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.84
new file mode 100644
index 0000000..3520b95
Binary files /dev/null and b/tests/integration-tests-topologies/src/main/resources/zk-3.5-test-data/snapshot.84 differ