You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 17:58:19 UTC

[incubator-pulsar] branch master updated: Downgrading ZK to stable version 3.4.13 (#2473)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d734738  Downgrading ZK to stable version 3.4.13 (#2473)
d734738 is described below

commit d734738d439963a5ed0348ab778385f8c4e8d35b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 6 10:58:17 2018 -0700

    Downgrading ZK to stable version 3.4.13 (#2473)
    
    * Downgrading ZK to stable version 3.4.13
    
    * Added integration test for ZK server downgrade
    
    * Fixed ZookeeperClientFactoryImplTest.testZKCreationFailure
    
    * Fixed dependencies version in license file
    
    * Addressed comments
    
    * Fixed jline version in license file
    
    * There are 2 jline jars to count
---
 distribution/server/licenses/LICENSE-JLine.txt     |  29 +++++
 distribution/server/src/assemble/LICENSE.bin.txt   |  43 +++----
 pom.xml                                            |   8 +-
 pulsar-sql/presto-distribution/LICENSE             |   4 +-
 pulsar-zookeeper-utils/pom.xml                     |  57 +++++++++
 .../pulsar/zookeeper/FileTxnSnapLogWrapper.java    |  60 ----------
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |   2 +-
 .../pulsar/zookeeper/SerializeUtilsAspect.java     | 130 +++++++++++++++++++++
 .../zookeeper/ZookeeperBkClientFactoryImpl.java    |   7 +-
 .../zookeeper/ZookeeperClientFactoryImplTest.java  |  12 +-
 .../pulsar/zookeeper/ZooKeeperServerAspect.java    |  16 ---
 .../integration/topologies/PulsarCluster.java      |  10 ++
 .../integration/topologies/PulsarClusterSpec.java  |   8 ++
 .../integration/upgrade/PulsarZKDowngradeTest.java | 115 ++++++++++++++++++
 .../src/test/resources/zk-3.5-test-data/log.1      | Bin 0 -> 20480 bytes
 .../src/test/resources/zk-3.5-test-data/log.85     | Bin 0 -> 1024 bytes
 .../src/test/resources/zk-3.5-test-data/snapshot.0 | Bin 0 -> 424 bytes
 .../test/resources/zk-3.5-test-data/snapshot.84    | Bin 0 -> 10747 bytes
 18 files changed, 394 insertions(+), 107 deletions(-)

diff --git a/distribution/server/licenses/LICENSE-JLine.txt b/distribution/server/licenses/LICENSE-JLine.txt
new file mode 100644
index 0000000..9a34d43
--- /dev/null
+++ b/distribution/server/licenses/LICENSE-JLine.txt
@@ -0,0 +1,29 @@
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index e0b1eb7..5f0f07b 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -366,27 +366,27 @@ The Apache Software License, Version 2.0
     - org.apache.logging.log4j-log4j-web-2.10.0.jar
  * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
  * BookKeeper
-    - org.apache.bookkeeper-bookkeeper-common-4.7.1.jar
-    - org.apache.bookkeeper-bookkeeper-proto-4.7.1.jar
-    - org.apache.bookkeeper-bookkeeper-server-4.7.1.jar
-    - org.apache.bookkeeper-circe-checksum-4.7.1.jar
-    - org.apache.bookkeeper-statelib-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-api-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-common-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-base-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-proto-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-server-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-service-api-4.7.1.jar
-    - org.apache.bookkeeper-stream-storage-service-impl-4.7.1.jar
-    - org.apache.bookkeeper.http-http-server-4.7.1.jar
-    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.1.jar
-    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.1.jar
-    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.1.jar
-    - org.apache.distributedlog-distributedlog-common-4.7.1.jar
-    - org.apache.distributedlog-distributedlog-core-4.7.1-tests.jar
-    - org.apache.distributedlog-distributedlog-core-4.7.1.jar
-    - org.apache.distributedlog-distributedlog-protocol-4.7.1.jar
+    - org.apache.bookkeeper-bookkeeper-common-4.7.2.jar
+    - org.apache.bookkeeper-bookkeeper-proto-4.7.2.jar
+    - org.apache.bookkeeper-bookkeeper-server-4.7.2.jar
+    - org.apache.bookkeeper-circe-checksum-4.7.2.jar
+    - org.apache.bookkeeper-statelib-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-api-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-common-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-java-client-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-java-client-base-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-proto-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-server-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-service-api-4.7.2.jar
+    - org.apache.bookkeeper-stream-storage-service-impl-4.7.2.jar
+    - org.apache.bookkeeper.http-http-server-4.7.2.jar
+    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.7.2.jar
+    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.7.2.jar
+    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.7.2.jar
+    - org.apache.distributedlog-distributedlog-common-4.7.2.jar
+    - org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
+    - org.apache.distributedlog-distributedlog-core-4.7.2.jar
+    - org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
  * LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
  * AsyncHttpClient
     - org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
@@ -457,6 +457,7 @@ BSD 3-clause "New" or "Revised" License
     - com.ea.agentloader-ea-agent-loader-1.0.2.jar -- licenses/LICENSE-EA-Agent-Loader.txt
  * Google auth library
     - com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt
+ * JLine -- jline-jline-0.9.94.jar -- licenses/LICENSE.JLine.txt
  * LevelDB -- (included in org.rocksdb.*.jar) -- licenses/LICENSE-LevelDB.txt
  * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- licenses/LICENSE-JSR305.txt
 
diff --git a/pom.xml b/pom.xml
index a35ca81..2352d5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,8 +136,8 @@ flexible messaging model and an intuitive client API.</description>
     <!-- apache commons -->
     <commons-compress.version>1.15</commons-compress.version>
 
-    <bookkeeper.version>4.7.1</bookkeeper.version>
-    <zookeeper.version>3.5.4-beta</zookeeper.version>
+    <bookkeeper.version>4.7.2</bookkeeper.version>
+    <zookeeper.version>3.4.13</zookeeper.version>
     <netty.version>4.1.22.Final</netty.version>
     <storm.version>1.0.5</storm.version>
     <jetty.version>9.3.11.v20160721</jetty.version>
@@ -1032,6 +1032,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>generated-site/**</exclude>
             <exclude>.github/*.md</exclude>
             <exclude>**/.idea/*</exclude>
+            <exclude>**/zk-3.5-test-data/*</exclude>
           </excludes>
           <mapping>
             <proto>JAVADOC_STYLE</proto>
@@ -1134,6 +1135,9 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>certificate-authority/index.txt</exclude>
             <exclude>certificate-authority/README.md</exclude>
 
+            <!-- Exclude ZK test data file -->
+            <exclude>**/zk-3.5-test-data/*</exclude>
+
             <!-- Python requirements files -->
             <exclude>**/requirements.txt</exclude>
 
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index d7f2fc8..67adc68 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -399,6 +399,7 @@ BSD License
     - asm-tree-6.0.jar
     - asm-util-6.0.jar
  * JLine
+   - jline-0.9.94.jar
    - jline-2.14.6.jar
  * ParaNamer Core
    - paranamer-2.7.jar
@@ -468,4 +469,5 @@ Public Domain (CC0) -- licenses/LICENSE-CC0.txt
 Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
     - bcpkix-jdk15on-1.55.jar
-    - bcprov-jdk15on-1.55.jar
\ No newline at end of file
+    - bcprov-jdk15on-1.55.jar
+
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 8e8d0a7..5b49e47 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -73,6 +73,17 @@
     </dependency>
 
     <dependency>
+      <groupId>org.aspectj</groupId>
+      <artifactId>aspectjrt</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.aspectj</groupId>
+      <artifactId>aspectjweaver</artifactId>
+    </dependency>
+
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.parent.version}</version>
@@ -102,7 +113,53 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>aspectj-maven-plugin</artifactId>
+        <configuration>
+          <complianceLevel>1.8</complianceLevel>
+          <source>1.8</source>
+          <target>1.8</target>
+          <showWeaveInfo>true</showWeaveInfo>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
+    <pluginManagement>
+      <plugins>
+        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>aspectj-maven-plugin</artifactId>
+                    <versionRange>[1.10,)</versionRange>
+                    <goals>
+                      <goal>compile</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
   </build>
 
 </project>
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java
deleted file mode 100644
index 45b9b78..0000000
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/FileTxnSnapLogWrapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.zookeeper.server.DataTree;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-@Slf4j
-public class FileTxnSnapLogWrapper extends FileTxnSnapLog {
-
-    public FileTxnSnapLogWrapper(FileTxnSnapLog src) throws IOException {
-        this(src.getDataDir(), src.getSnapDir());
-    }
-
-    public FileTxnSnapLogWrapper(File dataDir, File snapDir) throws IOException {
-        super(dataDir, snapDir);
-    }
-
-    @Override
-    public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
-        try {
-            return super.restore(dt, sessions, listener);
-        } catch (IOException e) {
-            if ("No snapshot found, but there are log entries. Something is broken!".equals(e.getMessage())) {
-                log.info("Ignoring exception for missing ZK db");
-                // Ignore error when snapshot is not found. This is needed when upgrading ZK from 3.4 to 3.5
-                // https://issues.apache.org/jira/browse/ZOOKEEPER-3056
-                save(dt, (ConcurrentHashMap<Long, Integer>) sessions);
-
-                /* return a zxid of zero, since we the database is empty */
-                return 0;
-            } else {
-                throw e;
-            }
-        }
-    }
-}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index f6ab92c..f7923e4 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -154,7 +154,7 @@ public class LocalBookkeeperEnsemble {
         try {
             // Allow all commands on ZK control port
             System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-            zks = new ZooKeeperServer(new FileTxnSnapLogWrapper(zkDataDir, zkDataDir));
+            zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
 
             serverFactory = new NIOServerCnxnFactory();
             serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), maxCC);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java
new file mode 100644
index 0000000..e7eb3cc
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/SerializeUtilsAspect.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.CreateTxnV0;
+import org.apache.zookeeper.txn.DeleteTxn;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.MultiTxn;
+import org.apache.zookeeper.txn.SetACLTxn;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+
+@Aspect
+public class SerializeUtilsAspect {
+    @Around("execution(* org.apache.zookeeper.server.util.SerializeUtils.deserializeTxn(..))")
+    public Record wrapper(ProceedingJoinPoint joinPoint) throws IOException {
+        byte[] txnBytes = (byte[]) joinPoint.getArgs()[0];
+        TxnHeader hdr = (TxnHeader) joinPoint.getArgs()[1];
+
+        return deserializeTxn(txnBytes, hdr);
+    }
+
+    private static final int CREATE2 = 15;
+    private static final int CREATE_CONTAINER = 19;
+    private static final int DELETE_CONTAINER = 20;
+
+    /**
+     * Copied `deserializeTxn()` from ZK-3.4.13
+     *
+     * https://github.com/apache/zookeeper/blob/release-3.4.13/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java#L54
+     *
+     * With addition for handling `create2`, `createContainer` and `deleteContainer` transactions when downgrading from
+     * 3.5.x to 3.4.x
+     */
+    public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr)
+            throws IOException {
+        final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);
+        InputArchive ia = BinaryInputArchive.getArchive(bais);
+
+        hdr.deserialize(ia, "hdr");
+        bais.mark(bais.available());
+        Record txn = null;
+        switch (hdr.getType()) {
+        case OpCode.createSession:
+            // This isn't really an error txn; it just has the same
+            // format. The error represents the timeout
+            txn = new CreateSessionTxn();
+            break;
+        case OpCode.closeSession:
+            return null;
+        case OpCode.create:
+        case CREATE2: // create2 is treated as a create op in 3.5.x
+        case CREATE_CONTAINER: // createContainer can be treated as a regular create operation, since ZK 3.4 doens't
+                               // have support for auto cleaning the empty directories
+            txn = new CreateTxn();
+            break;
+        case OpCode.delete:
+        case DELETE_CONTAINER: // deleteContainer is treaded as regular delete in 3.5.x
+            txn = new DeleteTxn();
+            break;
+        case OpCode.setData:
+            txn = new SetDataTxn();
+            break;
+        case OpCode.setACL:
+            txn = new SetACLTxn();
+            break;
+        case OpCode.error:
+            txn = new ErrorTxn();
+            break;
+        case OpCode.multi:
+            txn = new MultiTxn();
+            break;
+        default:
+            throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
+        }
+        if (txn != null) {
+            try {
+                txn.deserialize(ia, "txn");
+            } catch(EOFException e) {
+                // perhaps this is a V0 Create
+                if (hdr.getType() == OpCode.create) {
+                    CreateTxn create = (CreateTxn)txn;
+                    bais.reset();
+                    CreateTxnV0 createv0 = new CreateTxnV0();
+                    createv0.deserialize(ia, "txn");
+                    // cool now make it V1. a -1 parentCVersion will
+                    // trigger fixup processing in processTxn
+                    create.setPath(createv0.getPath());
+                    create.setData(createv0.getData());
+                    create.setAcl(createv0.getAcl());
+                    create.setEphemeral(createv0.getEphemeral());
+                    create.setParentCVersion(-1);
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return txn;
+    }
+
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index d18aea9..aac96ce 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,18 +23,19 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-@Slf4j
 public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
 
+    private static final Logger log = LoggerFactory.getLogger(ZookeeperBkClientFactoryImpl.class);
+
     private final OrderedExecutor executor;
 
     public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
index 74f3970..7331471 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperClientFactoryImplTest.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.zookeeper;
 
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.test.PortManager;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
@@ -79,6 +79,12 @@ public class ZookeeperClientFactoryImplTest {
         ZooKeeperClientFactory zkf = new ZookeeperClientFactoryImpl();
         CompletableFuture<ZooKeeper> zkFuture = zkf.create("invalid", SessionType.ReadWrite,
                 (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
-        assertTrue(zkFuture.isCompletedExceptionally());
+
+        try {
+            zkFuture.get(3, TimeUnit.SECONDS);
+            fail("Should have thrown exception");
+        } catch (TimeoutException e) {
+            // Expected
+        }
     }
 }
diff --git a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
index c7642f3..24919be 100644
--- a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
+++ b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
@@ -20,14 +20,9 @@ package org.apache.pulsar.zookeeper;
 
 import io.prometheus.client.Gauge;
 
-import java.util.Arrays;
-
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.aspectj.lang.JoinPoint;
-import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.After;
-import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.annotation.Pointcut;
 
@@ -42,17 +37,6 @@ public class ZooKeeperServerAspect {
     public void zkServerConstructorPointCut() {
     }
 
-    @Around("zkServerConstructorPointCut()")
-    public void zkServerConstructorBefore(ProceedingJoinPoint joinPoint) throws Throwable {
-        Object[] args = joinPoint.getArgs();
-        if (args[0] instanceof FileTxnSnapLog) {
-            // Wrap FileTxnSnapLog argument
-            args[0] = new FileTxnSnapLogWrapper((FileTxnSnapLog)args[0]);
-        }
-
-        joinPoint.proceed(args);
-    }
-
     @After("zkServerConstructorPointCut()")
     public void zkServerConstructor(JoinPoint joinPoint) throws Throwable {
         // ZooKeeperServer instance was created
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 8ff1e32..f78097b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ZKContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 
@@ -134,6 +135,15 @@ public class PulsarCluster {
                 )
         );
 
+        spec.classPathVolumeMounts.entrySet().forEach(e -> {
+            zkContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE);
+            proxyContainer.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE);
+
+            bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+            brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+            workerContainers.values().forEach(c -> c.withClasspathResourceMapping(e.getKey(), e.getValue(), BindMode.READ_WRITE));
+        });
+
     }
 
     public String getPlainTextServiceUrl() {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index c190421..f49200e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -20,11 +20,14 @@ package org.apache.pulsar.tests.integration.topologies;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
+
 import lombok.Builder;
 import lombok.Builder.Default;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.experimental.Accessors;
+
 import org.testcontainers.containers.GenericContainer;
 
 /**
@@ -99,4 +102,9 @@ public class PulsarClusterSpec {
     @Default
     boolean enableContainerLog = false;
 
+    /**
+     * Provide a map of paths (in the classpath) to mount as volumes inside the containers
+     */
+    @Builder.Default
+    Map<String, String> classPathVolumeMounts = new TreeMap<>();
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java
new file mode 100644
index 0000000..6005aa7
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarZKDowngradeTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.upgrade;
+
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.stream.Stream;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+/**
+ * Test downgrading ZK from 3.5.x to 3.4.x. This is part of the upgrade from Pulsar 2.1.0 to 2.1.1.
+ */
+@Slf4j
+public class PulsarZKDowngradeTest extends PulsarClusterTestBase {
+
+    protected static final int ENTRIES_PER_LEDGER = 1024;
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .clusterName(clusterName)
+                .classPathVolumeMounts(
+                        ImmutableMap.<String, String> builder()
+                                .put("zk-3.5-test-data", "/pulsar/data/zookeeper/version-2/version-2")
+                                .build())
+                .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup", spec.clusterName());
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    @Test(dataProvider = "ServiceUrlAndTopics")
+    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
+        String topicName = generateTopicName("testpubconsume", isPersistent);
+
+        int numMessages = 10;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("smoke-message-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> m = consumer.receive();
+            assertEquals("smoke-message-" + i, m.getValue());
+        }
+
+    }
+}
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/log.1 b/tests/integration/src/test/resources/zk-3.5-test-data/log.1
new file mode 100644
index 0000000..6d5ae52
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/log.1 differ
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/log.85 b/tests/integration/src/test/resources/zk-3.5-test-data/log.85
new file mode 100644
index 0000000..4ecb63b
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/log.85 differ
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.0 b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.0
new file mode 100644
index 0000000..3e6deee
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.0 differ
diff --git a/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.84 b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.84
new file mode 100644
index 0000000..3520b95
Binary files /dev/null and b/tests/integration/src/test/resources/zk-3.5-test-data/snapshot.84 differ