You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/06 17:58:19 UTC

[GitHub] merlimat closed pull request #2473: Downgrading ZK to stable version 3.4.13

merlimat closed pull request #2473: Downgrading ZK to stable version 3.4.13
URL: https://github.com/apache/incubator-pulsar/pull/2473
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/server/licenses/LICENSE-JLine.txt b/distribution/server/licenses/LICENSE-JLine.txt
new file mode 100644
index 0000000000..9a34d43f5a
--- /dev/null
+++ b/distribution/server/licenses/LICENSE-JLine.txt
@@ -0,0 +1,29 @@
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index e0b1eb730e..5f0f07b680 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -366,27 +366,27 @@ The Apache Software License, Version 2.0
     - org.apache.logging.log4j-log4j-web-2.10.0.jar
  * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
  * BookKeeper
-    - org.apache.bookkeeper-bookkeeper-common-4.7.1.jar
-    - org.apache.bookkeeper-bookkeeper-proto-4.7.1.jar
-    - org.apache.bookkeeper-bookkeeper-server-4.7.1.jar
-    - org.apache.bookkeeper-circe-checksum-4.7.1.jar
-    - org.apache.bookkeeper-statelib-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-api-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-common-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-base-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-proto-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-server-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-service-api-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-service-impl-4.7.1.jar
-    - org.apache.bookkeeper.http-http-server-4.7.1.jar
-    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.1.jar
-    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.1.jar
-    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.1.jar
-    - org.apache.distributedlog-distributedlog-common-4.7.1.jar
-    - org.apache.distributedlog-distributedlog-core-4.7.1-tests.jar
-    - org.apache.distributedlog-distributedlog-core-4.7.1.jar
-    - org.apache.distributedlog-distributedlog-protocol-4.7.1.jar
+    - org.apache.bookkeeper-bookkeeper-common-4.7.2.jar
+    - org.apache.bookkeeper-bookkeeper-proto-4.7.2.jar
+    - org.apache.bookkeeper-bookkeeper-server-4.7.2.jar
+    - org.apache.bookkeeper-circe-checksum-4.7.2.jar
+    - org.apache.bookkeeper-statelib-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-api-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-common-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-java-client-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-java-client-base-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-proto-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-server-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-service-api-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-service-impl-4.7.2.jar
+    - org.apache.bookkeeper.http-http-server-4.7.2.jar
+    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.2.jar
+    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.2.jar
+    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.2.jar
+    - org.apache.distributedlog-distributedlog-common-4.7.2.jar
+    - org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
+    - org.apache.distributedlog-distributedlog-core-4.7.2.jar
+    - org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
  * LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
  * AsyncHttpClient
     - org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
@@ -457,6 +457,7 @@ BSD 3-clause "New" or "Revised" License
     - com.ea.agentloader-ea-agent-loader-1.0.2.jar -- licenses/LICENSE-EA-Agent-Loader.txt
  * Google auth library
     - com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt
+ * JLine -- jline-jline-0.9.94.jar -- licenses/LICENSE.JLine.txt
  * LevelDB -- (included in org.rocksdb.*.jar) -- licenses/LICENSE-LevelDB.txt
  * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- licenses/LICENSE-JSR305.txt
 
diff --git a/pom.xml b/pom.xml
index 681f18e931..5dfe5ea12b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,8 +136,8 @@ flexible messaging model and an intuitive client API.</description>
     <!-- apache commons -->
     <commons-compress.version>1.15</commons-compress.version>
 
-    <bookkeeper.version>4.7.1</bookkeeper.version>
-    <zookeeper.version>3.5.4-beta</zookeeper.version>
+    <bookkeeper.version>4.7.2</bookkeeper.version>
+    <zookeeper.version>3.4.13</zookeeper.version>
     <netty.version>4.1.22.Final</netty.version>
     <storm.version>1.0.5</storm.version>
     <jetty.version>9.3.11.v20160721</jetty.version>
@@ -1031,6 +1031,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>generated-site/**</exclude>
             <exclude>.github/*.md</exclude>
             <exclude>**/.idea/*</exclude>
+            <exclude>**/zk-3.5-test-data/*</exclude>
           </excludes>
           <mapping>
             <proto>JAVADOC_STYLE</proto>
@@ -1133,6 +1134,9 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>certificate-authority/index.txt</exclude>
             <exclude>certificate-authority/README.md</exclude>
 
+            <!-- Exclude ZK test data file -->
+            <exclude>**/zk-3.5-test-data/*</exclude>
+
             <!-- Python requirements files -->
             <exclude>**/requirements.txt</exclude>
 
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 51d4923335..fed49baddb 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -396,6 +396,7 @@ BSD License
     - asm-tree-6.0.jar
     - asm-util-6.0.jar
  * JLine
+   - jline-0.9.94.jar
    - jline-2.14.6.jar
  * ParaNamer Core
    - paranamer-2.7.jar
@@ -465,4 +466,5 @@ Public Domain (CC0) -- licenses/LICENSE-CC0.txt
 Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
     - bcpkix-jdk15on-1.55.jar
-    - bcprov-jdk15on-1.55.jar
\ No newline at end of file
+    - bcprov-jdk15on-1.55.jar
+
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 8e8d0a7c6a..5b49e47c52 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -72,6 +72,17 @@
       <version>${project.parent.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.aspectj</groupId>
+      <artifactId>aspectjrt</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.aspectj</groupId>
+      <artifactId>aspectjweaver</artifactId>
+    </dependency>
+
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>managed-ledger-original</artifactId>
@@ -102,7 +113,53 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>aspectj-maven-plugin</artifactId>
+        <configuration>
+          <complianceLevel>1.8</complianceLevel>
+          <source>1.8</source>
+          <target>1.8</target>
+          <showWeaveInfo>true</showWeaveInfo>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
+    <pluginManagement>
+      <plugins>
+        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>aspectj-maven-plugin</artifactId>
+                    <versionRange>[1.10,)</versionRange>
+                    <goals>
+                      <goal>compile</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
   </build>
 
 </project>
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java
deleted file mode 100644
index 45b9b78576..0000000000
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.zookeeper.server.DataTree;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-@Slf4j
-public class FileTxnSnapLogWrapper extends FileTxnSnapLog {
-
-    public FileTxnSnapLogWrapper(FileTxnSnapLog src) throws IOException {
-        this(src.getDataDir(), src.getSnapDir());
-    }
-
-    public FileTxnSnapLogWrapper(File dataDir, File snapDir) throws IOException {
-        super(dataDir, snapDir);
-    }
-
-    @Override
-    public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
-        try {
-            return super.restore(dt, sessions, listener);
-        } catch (IOException e) {
-            if ("No snapshot found, but there are log entries. Something is broken!".equals(e.getMessage())) {
-                log.info("Ignoring exception for missing ZK db");
-                // Ignore error when snapshot is not found. This is needed when upgrading ZK from 3.4 to 3.5
-                // https://issues.apache.org/jira/browse/ZOOKEEPER-3056
-                save(dt, (ConcurrentHashMap<Long, Integer>) sessions);
-
-                /* return a zxid of zero, since we the database is empty */
-                return 0;
-            } else {
-                throw e;
-            }
-        }
-    }
-}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index f6ab92cb97..f7923e41eb 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -154,7 +154,7 @@ private void runZookeeper(int maxCC) throws IOException {
         try {
             // Allow all commands on ZK control port
             System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-            zks = new ZooKeeperServer(new FileTxnSnapLogWrapper(zkDataDir, zkDataDir));
+            zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
 
             serverFactory = new NIOServerCnxnFactory();
             serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), maxCC);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java
new file mode 100644
index 0000000000..e7eb3cc80e
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.CreateTxnV0;
+import org.apache.zookeeper.txn.DeleteTxn;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.MultiTxn;
+import org.apache.zookeeper.txn.SetACLTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+
+@Aspect
+public class SerializeUtilsAspect {
+    @Around("execution(* org.apache.zookeeper.server.util.SerializeUtils.deserializeTxn(..))")
+    public Record wrapper(ProceedingJoinPoint joinPoint) throws IOException {
+        byte[] txnBytes = (byte[]) joinPoint.getArgs()[0];
+        TxnHeader hdr = (TxnHeader) joinPoint.getArgs()[1];
+
+        return deserializeTxn(txnBytes, hdr);
+    }
+
+    private static final int CREATE2 = 15;
+    private static final int CREATE_CONTAINER = 19;
+    private static final int DELETE_CONTAINER = 20;
+
+    /**
+     * Copied `deserializeTxn()` from ZK-3.4.13
+     *
+     * https://github.com/apache/zookeeper/blob/release-3.4.13/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java#L54
+     *
+     * With addition for handling `create2`, `createContainer` and `deleteContainer` transactions when downgrading from
+     * 3.5.x to 3.4.x
+     */
+    public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr)
+            throws IOException {
+        final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);
+        InputArchive ia = BinaryInputArchive.getArchive(bais);
+
+        hdr.deserialize(ia, "hdr");
+        bais.mark(bais.available());
+        Record txn = null;
+        switch (hdr.getType()) {
+        case OpCode.createSession:
+            // This isn't really an error txn; it just has the same
+            // format. The error represents the timeout
+            txn = new CreateSessionTxn();
+            break;
+        case OpCode.closeSession:
+            return null;
+        case OpCode.create:
+        case CREATE2: // create2 is treated as a create op in 3.5.x
+        case CREATE_CONTAINER: // createContainer can be treated as a regular create operation, since ZK 3.4 doens't
+                               // have support for auto cleaning the empty directories
+            txn = new CreateTxn();
+            break;
+        case OpCode.delete:
+        case DELETE_CONTAINER: // deleteContainer is treaded as regular delete in 3.5.x
+            txn = new DeleteTxn();
+            break;
+        case OpCode.setData:
+            txn = new SetDataTxn();
+            break;
+        case OpCode.setACL:
+            txn = new SetACLTxn();
+            break;
+        case OpCode.error:
+            txn = new ErrorTxn();
+            break;
+        case OpCode.multi:
+            txn = new MultiTxn();
+            break;
+        default:
+            throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
+        }
+        if (txn != null) {
+            try {
+                txn.deserialize(ia, "txn");
+            } catch(EOFException e) {
+                // perhaps this is a V0 Create
+                if (hdr.getType() == OpCode.create) {
+                    CreateTxn create = (CreateTxn)txn;
+                    bais.reset();
+                    CreateTxnV0 createv0 = new CreateTxnV0();
+                    createv0.deserialize(ia, "txn");
+                    // cool now make it V1. a -1 parentCVersion will
+                    // trigger fixup processing in processTxn
+                    create.setPath(createv0.getPath());
+                    create.setData(createv0.getData());
+                    create.setAcl(createv0.getAcl());
+                    create.setEphemeral(createv0.getEphemeral());
+                    create.setParentCVersion(-1);
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return txn;
+    }
+
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index d18aea93e0..aac96ce84c 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,18 +23,19 @@
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-@Slf4j
 public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
 
+    private static final Logger log = LoggerFactory.getLogger(ZookeeperBkClientFactoryImpl.class);
+
     private final OrderedExecutor executor;
 
     public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
index 74f3970ba4..7331471b2f 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
@@ -20,13 +20,13 @@
 
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.test.PortManager;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
@@ -79,6 +79,12 @@ void testZKCreationFailure() throws Exception {
         ZooKeeperClientFactory zkf = new ZookeeperClientFactoryImpl();
         CompletableFuture<ZooKeeper> zkFuture = zkf.create("invalid", SessionType.ReadWrite,
                 (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
-        assertTrue(zkFuture.isCompletedExceptionally());
+
+        try {
+            zkFuture.get(3, TimeUnit.SECONDS);
+            fail("Should have thrown exception");
+        } catch (TimeoutException e) {
+            // Expected
+        }
     }
 }
diff --git a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
index c7642f3e7b..24919bea78 100644
--- a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
+++ b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
@@ -20,14 +20,9 @@
 
 import io.prometheus.client.Gauge;
 
-import java.util.Arrays;
-
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.aspectj.lang.JoinPoint;
-import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.After;
-import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.annotation.Pointcut;
 
@@ -42,17 +37,6 @@
     public void zkServerConstructorPointCut() {
     }
 
-    @Around("zkServerConstructorPointCut()")
-    public void zkServerConstructorBefore(ProceedingJoinPoint joinPoint) throws Throwable {
-        Object[] args = joinPoint.getArgs();
-        if (args[0] instanceof FileTxnSnapLog) {
-            // Wrap FileTxnSnapLog argument
-            args[0] = new FileTxnSnapLogWrapper((FileTxnSnapLog)args[0]);
-        }
-
-        joinPoint.proceed(args);
-    }
-
     @After("zkServerConstructorPointCut()")
     public void zkServerConstructor(JoinPoint joinPoint) throws Throwable {
         // ZooKeeperServer instance was created
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 8ff1e3231d..f78097b65c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -43,6 +43,7 @@
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ZKContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 
@@ -134,6 +135,15 @@ private PulsarCluster(PulsarClusterSpec spec) {
                 )
         );
 
+        spec.classPathVolumeMounts.entrySet().forEach(e -> {
+            zkContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE);
+            proxyContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE);
+
+            bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+            brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+            workerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+        });
+
     }
 
     public String getPlainTextServiceUrl() {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index c190421e20..f49200ecb4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -20,11 +20,14 @@
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
+
 import lombok.Builder;
 import lombok.Builder.Default;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.experimental.Accessors;
+
 import org.testcontainers.containers.GenericContainer;
 
 /**
@@ -99,4 +102,9 @@
     @Default
     boolean enableContainerLog = false;
 
+    /**
+     * Provide a map of paths (in the classpath) to mount as volumes inside the containers
+     */
+    @Builder.Default
+    Map<String, String> classPathVolumeMounts = new TreeMap<>();
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java
new file mode 100644
index 0000000000..6005aa727c
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.upgrade;
+
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.stream.Stream;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+/**
+ * Test downgrading ZK from 3.5.x to 3.4.x. This is part of the upgrade from Pulsar 2.1.0 to 2.1.1.
+ */
+@Slf4j
+public class PulsarZKDowngradeTest extends PulsarClusterTestBase {
+
+    protected static final int ENTRIES_PER_LEDGER = 1024;
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .clusterName(clusterName)
+                .classPathVolumeMounts(
+                        ImmutableMap.<String, String> builder()
+                                .put("zk-3.5-test-data", "/pulsar/data/zookeeper/version-2/version-2")
+                                .build())
+                .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup", spec.clusterName());
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    @Test(dataProvider = "ServiceUrlAndTopics")
+    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
+        String topicName = generateTopicName("testpubconsume", isPersistent);
+
+        int numMessages = 10;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("smoke-message-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> m = consumer.receive();
+            assertEquals("smoke-message-" + i, m.getValue());
+        }
+
+    }
+}
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/log.1 b/tests/integration/src/test/resources/zk-3.5-test-data/log.1
new file mode 100644
index 0000000000..6d5ae524a2
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/log.1 differ
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/log.85 b/tests/integration/src/test/resources/zk-3.5-test-data/log.85
new file mode 100644
index 0000000000..4ecb63be51
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/log.85 differ
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.0 b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.0
new file mode 100644
index 0000000000..3e6deee02b
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.0 differ
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.84 b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.84
new file mode 100644
index 0000000000..3520b95f62
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.84 differ


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services