You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 01:00:07 UTC

[49/51] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
index e0b2810,0000000..1d9bd2d
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
@@@ -1,94 -1,0 +1,92 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.pdx.internal.PdxType;
 +
 +/**
 + * Retrieve the PDXType, given an integer PDX id, from a server.
 + * @author dsmith
 + * @since 6.6
 + */
 +public class GetPDXTypeByIdOp {
 +  /**
 +   * Get a PdxType from the given pool.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static PdxType execute(ExecutablePool pool,
 +                             int pdxId)
 +  {
 +    AbstractOp op = new GetPDXTypeByIdOpImpl(pdxId);
 +    return (PdxType) pool.execute(op);
 +  }
 +                                                               
 +  private GetPDXTypeByIdOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class GetPDXTypeByIdOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public GetPDXTypeByIdOpImpl(int pdxId) {
 +      super(MessageType.GET_PDX_TYPE_BY_ID, 1);
 +      getMessage().addIntPart(pdxId);
 +    }
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      return processObjResponse(msg, "getPDXTypeById");
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGetPDXTypeById();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeByIdSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    //Don't send the transaction id for this message type.
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
 +    
-     //TODO - no idea what this mumbo jumbo means, but it's on
-     //most of the other messages like this.
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
index 2990192,0000000..262cb9a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
@@@ -1,112 -1,0 +1,112 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.util.Map;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.pdx.internal.PdxType;
 +
 +/**
 + * Retrieve all known PDX types.
 + * 
 + * @author bakera
 + * @since 7.0
 + */
 +public class GetPDXTypesOp {
 +
 +  public static Map<Integer, PdxType> execute(ExecutablePool pool) {
 +    AbstractOp op = new GetPDXTypesOpImpl();
 +    return (Map<Integer, PdxType>) pool.execute(op);
 +  }
 +                                                               
 +  private GetPDXTypesOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class GetPDXTypesOpImpl extends AbstractOp {
 +    public GetPDXTypesOpImpl() {
 +      super(MessageType.GET_PDX_TYPES, 1);
 +      getMessage().addIntPart(0); // must have at least one part
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      Part part = msg.getPart(0);
 +      int msgType = msg.getMessageType();
 +      if (msgType == MessageType.RESPONSE) {
 +        return (Map<Integer, PdxType>) part.getObject();
 +
 +      } else {
 +        if (msgType == MessageType.EXCEPTION) {
 +          String s = "While performing a remote " + "getPdxTypes";
 +          throw new ServerOperationException(s, (Throwable) part.getObject());
 +
 +        } else if (isErrorResponse(msgType)) {
 +          throw new ServerOperationException(part.getString());
 +
 +        } else {
 +          throw new InternalGemFireError("Unexpected message type "
 +              + MessageType.getString(msgType));
 +        }
 +      }
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return 0;
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +    }
 +    
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
index 59b99f0,0000000..27d80b1
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
@@@ -1,91 -1,0 +1,91 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Tell a server to become the primary host of a server-to-client queue
 + * @author darrel
 + * @since 5.7
 + */
 +public class MakePrimaryOp {
 +  /**
 +   * Tell the given server to become the primary host of a server-to-client queue
 +   * @param pool the pool to use to communicate with the server.
 +   * @param conn the connection to do the execution on
 +   * @param sentClientReady true if the client ready message has already been sent
 +   */
 +  public static void execute(ExecutablePool pool, Connection conn, boolean sentClientReady)
 +  {
 +    AbstractOp op = new MakePrimaryOpImpl(sentClientReady);
 +    pool.executeOn(conn, op);
 +  }
 +                                                               
 +  private MakePrimaryOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class MakePrimaryOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public MakePrimaryOpImpl(boolean sentClientReady) {
 +      super(MessageType.MAKE_PRIMARY, 1);
 +      getMessage().addBytesPart(new byte[] {(byte)(sentClientReady?0x01:0x00)});
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "makePrimary");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startMakePrimary();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endMakePrimarySend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endMakePrimary(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
index e0bc81b,0000000..e70d50a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
@@@ -1,97 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.ServerLocation;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Ping a server to see if it is still alive.
 + * @author darrel
 + * @since 5.7
 + */
 +public class PingOp {
 +  /**
 +   * Ping the specified server to see if it is still alive
 +   * @param pool the pool to use to communicate with the server.
 +   * @param server the server to do the execution on
 +   */
 +  public static void execute(ExecutablePool pool, ServerLocation server)
 +  {
 +    AbstractOp op = new PingOpImpl();
 +    pool.executeOn(server, op, false,false);
 +  }
 +                                                               
 +  private PingOp() {
 +    // no instances allowed
 +  }
 +
 +  static class PingOpImpl extends AbstractOp {
 +    
 +    private long startTime;
 +    
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public PingOpImpl() {
 +      super(MessageType.PING, 0);
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +      Message.messageType.set(null);
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      startTime = System.currentTimeMillis();
 +      getMessage().send(false);
 +      Message.messageType.set(MessageType.PING);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "ping");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startPing();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endPingSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endPing(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
index 0b65c56,0000000..4ee680a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
@@@ -1,101 -1,0 +1,101 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +import java.util.Iterator;
 +import java.util.List;
 +
 +/**
 + * Send the primary server acknowledgement on the events this client
 + * has received and processed from it.
 + * @author darrel
 + * @since 5.7
 + */
 +public class PrimaryAckOp {
 +  /**
 +   * Send the primary server acknowledgement on the events this client
 +   * has received and processed from it
 +   * using connections from the given pool
 +   * to communicate with the server.
 +   * @param connection 
 +   * @param pool the pool to use to communicate with the server.
 +   * @param events list of events to acknowledge
 +   */
 +  public static void execute(Connection connection, ExecutablePool pool,
 +                             List events)
 +  {
 +    AbstractOp op = new PrimaryAckOpImpl(events);
 +    pool.executeOn(connection, op);
 +  }
 +                                                               
 +  private PrimaryAckOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class PrimaryAckOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public PrimaryAckOpImpl(List events) {
 +      super(MessageType.PERIODIC_ACK, events.size());
 +      for (Iterator i = events.iterator(); i.hasNext();) {
 +        getMessage().addObjPart(i.next());
 +      }
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "primaryAck");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startPrimaryAck();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endPrimaryAckSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endPrimaryAck(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
index 53b8fa9,0000000..2747fa8
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
@@@ -1,124 -1,0 +1,124 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.util.Properties;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerConnectivityException;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.distributed.internal.ServerLocation;
 +import com.gemstone.gemfire.internal.HeapDataOutputStream;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +
 +public class ProxyCacheCloseOp {
 +
 +  public static Object executeOn(ServerLocation location, ExecutablePool pool,
 +      Properties securityProps, boolean keepAlive) {
 +    AbstractOp op = new ProxyCacheCloseOpImpl(pool, securityProps, keepAlive);
 +    return pool.executeOn(location, op);
 +  }
 +
 +  private ProxyCacheCloseOp() {
 +    // no instances allowed
 +  }
 +
 +  static class ProxyCacheCloseOpImpl extends AbstractOp {
 +
 +    public ProxyCacheCloseOpImpl(ExecutablePool pool, Properties securityProps,
 +        boolean keepAlive) {
 +      super(MessageType.REMOVE_USER_AUTH, 1);
-       getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++      getMessage().setMessageHasSecurePartFlag();
 +      getMessage().addBytesPart(keepAlive ? new byte[] {1} : new byte[] {0});
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
 +      HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
 +      byte[] secureBytes = null;
 +      hdos.writeLong(cnx.getConnectionID());
 +      Object userId = UserAttributes.userAttributes.get().getServerToId().get(cnx.getServer());
 +      if (userId == null) {
 +        // This will ensure that this op is retried on another server, unless
 +        // the retryCount is exhausted. Fix for Bug 41501
 +        throw new ServerConnectivityException(
 +            "Connection error while authenticating user");
 +      }
 +      hdos.writeLong((Long)userId);
 +      try {
 +        secureBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
 +            hdos.toByteArray());
 +      } finally {
 +        hdos.close();
 +      }
 +      getMessage().setSecurePart(secureBytes);
 +      getMessage().send(false);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      Part part = msg.getPart(0);
 +      final int msgType = msg.getMessageType();
 +      if (msgType == MessageType.REPLY) {
 +        return part.getObject();
 +      }
 +      else if (msgType == MessageType.EXCEPTION) {
 +        String s = "While performing a remote proxy cache close";
 +        throw new ServerOperationException(s, (Throwable)part.getObject());
 +        // Get the exception toString part.
 +        // This was added for c++ thin client and not used in java
 +        // Part exceptionToStringPart = msg.getPart(1);
 +      }
 +      else if (isErrorResponse(msgType)) {
 +        throw new ServerOperationException(part.getString());
 +      }
 +      else {
 +        throw new InternalGemFireError("Unexpected message type "
 +            + MessageType.getString(msgType));
 +      }
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.REQUESTDATAERROR;
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGet();
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetSend(start, hasFailed());
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGet(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
index a003538,0000000..d2631fc
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
@@@ -1,91 -1,0 +1,91 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Tells the server we are ready to receive server-to-client events
 + * from durable subscriptions.
 + * @author darrel
 + * @since 5.7
 + */
 +public class ReadyForEventsOp {
 +  /**
 +   * Tells the primary server we are ready to receive server-to-client events
 +   * from durable subscriptions.
 +   * @param pool the pool to use to communicate with the server.
 +   * @param primary 
 +   */
 +  public static void execute(ExecutablePool pool, QueueConnectionImpl primary)
 +  {
 +    AbstractOp op = new ReadyForEventsOpImpl();
 +    pool.executeOn(primary, op);
 +  }
 +                                                               
 +  private ReadyForEventsOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class ReadyForEventsOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public ReadyForEventsOpImpl() {
 +      super(MessageType.CLIENT_READY, 1);
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "readyForEvents");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startReadyForEvents();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endReadyForEventsSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endReadyForEvents(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
index b2b975f,0000000..869ad64
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
@@@ -1,138 -1,0 +1,138 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.io.IOException;
 +
 +import com.gemstone.gemfire.DataSerializer;
 +import com.gemstone.gemfire.SerializationException;
 +import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
 +import com.gemstone.gemfire.internal.cache.ClientServerObserver;
 +import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
 +import com.gemstone.gemfire.internal.cache.EventID;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.util.BlobHelper;
 +
 +public class RegisterDataSerializersOp {
 +
 +  public static void execute(ExecutablePool pool,
 +      DataSerializer[] dataSerializers, EventID eventId) {
 +    AbstractOp op = new RegisterDataSerializersOpImpl(dataSerializers,
 +        eventId);
 +    pool.execute(op);
 +  }
 +  
 +  public static void execute(ExecutablePool pool,
 +      SerializerAttributesHolder[] holders, EventID eventId) {
 +    AbstractOp op = new RegisterDataSerializersOpImpl(holders,
 +        eventId);
 +    pool.execute(op);
 +  }
 +  
 +  private RegisterDataSerializersOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class RegisterDataSerializersOpImpl extends AbstractOp {
 +
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public RegisterDataSerializersOpImpl(DataSerializer[] dataSerializers,
 +        EventID eventId) {
 +      super(MessageType.REGISTER_DATASERIALIZERS, dataSerializers.length * 2 + 1);
 +      for(int i = 0; i < dataSerializers.length; i++) {
 +        DataSerializer dataSerializer = dataSerializers[i];
 +         // strip '.class' off these class names
 +        String className = dataSerializer.getClass().toString().substring(6);
 +        try {
 +          getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
 +        } catch (IOException ex) {
 +          throw new SerializationException("failed serializing object", ex);
 +        }
 +        getMessage().addIntPart(dataSerializer.getId());
 +      }
 +      getMessage().addBytesPart(eventId.calcBytes());
 +      // // CALLBACK FOR TESTING PURPOSE ONLY ////
 +      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
 +        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
 +        bo.beforeSendingToServer(eventId);
 +      }
 +   }
 +    
 +    /**
 +     * @throws SerializationException
 +     *           Thrown when serialization fails.
 +     */
 +    public RegisterDataSerializersOpImpl(SerializerAttributesHolder[] holders,
 +        EventID eventId) {
 +      super(MessageType.REGISTER_DATASERIALIZERS, holders.length * 2 + 1);
 +      for (int i = 0; i < holders.length; i++) {
 +        try {
 +          getMessage().addBytesPart(
 +              BlobHelper.serializeToBlob(holders[i].getClassName()));
 +        } catch (IOException ex) {
 +          throw new SerializationException("failed serializing object", ex);
 +        }
 +        getMessage().addIntPart(holders[i].getId());
 +      }
 +      getMessage().addBytesPart(eventId.calcBytes());
 +      // // CALLBACK FOR TESTING PURPOSE ONLY ////
 +      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
 +        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
 +        bo.beforeSendingToServer(eventId);
 +      }
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "registerDataSerializers");
 +      return null;
 +    }
 +    
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startRegisterDataSerializers();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endRegisterDataSerializersSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endRegisterDataSerializers(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
index 93d3756,0000000..0d5a137
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
@@@ -1,180 -1,0 +1,180 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.io.IOException;
 +
 +import com.gemstone.gemfire.Instantiator;
 +import com.gemstone.gemfire.SerializationException;
 +import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
 +import com.gemstone.gemfire.internal.cache.ClientServerObserver;
 +import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
 +import com.gemstone.gemfire.internal.cache.EventID;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.util.BlobHelper;
 +
 +/**
 + * Register a bunch of instantiators on a server
 + * @author darrel
 + * @since 5.7
 + */
 +public class RegisterInstantiatorsOp {
 +  /**
 +   * Register a bunch of instantiators on a server
 +   * using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   * @param instantiators the instantiators to register
 +   * @param eventId the id of this event
 +   */
 +  public static void execute(ExecutablePool pool,
 +                             Instantiator[] instantiators,
 +                             EventID eventId)
 +  {
 +    AbstractOp op = new RegisterInstantiatorsOpImpl(instantiators, eventId);
 +    pool.execute(op, Integer.MAX_VALUE);
 +  }
 +
 +  /**
 +   * Register a bunch of instantiators on a server using connections from the
 +   * given pool to communicate with the server.
 +   * 
 +   * @param pool
 +   *          the pool to use to communicate with the server.
 +   * @param holders
 +   *          the {@link InstantiatorAttributesHolder}s containing info about
 +   *          the instantiators to register
 +   * @param eventId
 +   *          the id of this event
 +   */
 +  public static void execute(ExecutablePool pool,
 +      Object[] holders, EventID eventId) {
 +    AbstractOp op = new RegisterInstantiatorsOpImpl(holders,
 +        eventId);
 +    pool.execute(op, Integer.MAX_VALUE);
 +  }
 +
 +  private RegisterInstantiatorsOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class RegisterInstantiatorsOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public RegisterInstantiatorsOpImpl(Instantiator[] instantiators,
 +                                       EventID eventId) {
 +      super(MessageType.REGISTER_INSTANTIATORS, instantiators.length * 3 + 1);
 +      for(int i = 0; i < instantiators.length; i++) {
 +        Instantiator instantiator = instantiators[i];
 +         // strip '.class' off these class names
 +        String className = instantiator.getClass().toString().substring(6);
 +        String instantiatedClassName = instantiator.getInstantiatedClass().toString().substring(6);
 +        try {
 +          getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
 +          getMessage().addBytesPart(BlobHelper.serializeToBlob(instantiatedClassName));
 +        } catch (IOException ex) {
 +          throw new SerializationException("failed serializing object", ex);
 +        }
 +        getMessage().addIntPart(instantiator.getId());
 +      }
 +      getMessage().addBytesPart(eventId.calcBytes());
 +//     // // CALLBACK FOR TESTING PURPOSE ONLY ////
 +      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
 +        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
 +        bo.beforeSendingToServer(eventId);
 +      }
 +    }
 +
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException
 +     *           if serialization fails
 +     */
 +    public RegisterInstantiatorsOpImpl(Object[] holders,
 +        EventID eventId) {
 +      super(MessageType.REGISTER_INSTANTIATORS, holders.length * 3 + 1);
 +      for (Object obj : holders) {
 +        String instantiatorClassName = null;
 +        String instantiatedClassName = null;
 +        int id = 0;
 +        if (obj instanceof Instantiator) {
 +          instantiatorClassName = ((Instantiator)obj).getClass().getName();
 +          instantiatedClassName = ((Instantiator)obj).getInstantiatedClass()
 +              .getName();
 +          id = ((Instantiator)obj).getId();
 +        } else {
 +          instantiatorClassName = ((InstantiatorAttributesHolder)obj)
 +              .getInstantiatorClassName();
 +          instantiatedClassName = ((InstantiatorAttributesHolder)obj)
 +              .getInstantiatedClassName();
 +          id = ((InstantiatorAttributesHolder)obj).getId();
 +        }
 +        try {
 +          getMessage().addBytesPart(
 +              BlobHelper.serializeToBlob(instantiatorClassName));
 +          getMessage().addBytesPart(
 +              BlobHelper.serializeToBlob(instantiatedClassName));
 +        } catch (IOException ex) {
 +          throw new SerializationException("failed serializing object", ex);
 +        }
 +        getMessage().addIntPart(id);
 +      }
 +      getMessage().addBytesPart(eventId.calcBytes());
 +      // // // CALLBACK FOR TESTING PURPOSE ONLY ////
 +      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
 +        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
 +        bo.beforeSendingToServer(eventId);
 +      }
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "registerInstantiators");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startRegisterInstantiators();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endRegisterInstantiatorsSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endRegisterInstantiators(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
index 2793f32,0000000..6f01b96
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
@@@ -1,99 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Does a Rollback on the server
 + * @since 6.6
 + * @author sbawaska
 + */
 +public class RollbackOp {
 +
 +  /**
 +   * Does a rollback on the server for given transaction
 +   * @param pool the pool to use to communicate with the server.
 +   * @param txId the id of the transaction to rollback
 +   */
 +  public static void execute(ExecutablePool pool, int txId) {
 +    RollbackOpImpl op = new RollbackOpImpl(txId);
 +    pool.execute(op);
 +  }
 +  
 +  private RollbackOp() {
 +    // no instance allowed
 +  }
 +  
 +  private static class RollbackOpImpl extends AbstractOp {
 +    private int txId;
 +
 +    protected RollbackOpImpl(int txId) {
 +      super(MessageType.ROLLBACK, 1);
 +      getMessage().setTransactionId(txId);
 +      this.txId = txId;
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return "Rollback(txId="+this.txId+")";
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "rollback");
 +      return null;
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.EXCEPTION;
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startRollback();
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endRollbackSend(start, hasFailed());
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endRollback(start, hasTimedOut(), hasFailed());
 +    }
 +     
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
index 6d69083,0000000..42cc225
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
@@@ -1,92 -1,0 +1,92 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Does a region size on a server
 + * @author gregp
 + * @since 6.6
 + */
 +public class SizeOp {
 +  /**
 +   * Does a region size on a server using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   * @param region the name of the region to do the entry keySet on
 +   */
 +  public static Integer execute(InternalPool pool,
 +                            String region)
 +  {
 +    AbstractOp op = new SizeOpImpl(region);
 +    return (Integer)pool.execute(op);
 +  }
 +                                                               
 +  private SizeOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class SizeOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public SizeOpImpl(String region) {
 +      super(MessageType.SIZE, 1);
 +      getMessage().addStringPart(region);
 +    }
 +
 +    @Override  
 +    protected Object processResponse(Message msg) throws Exception {
 +      
 +      return processObjResponse(msg, "size");
 +    }
 +    @Override  
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.SIZE_ERROR;
 +    }
 +    @Override  
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startSize();
 +    }
 +    @Override  
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endSizeSend(start, hasFailed());
 +    }
 +    @Override  
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endSize(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
index 1fecc7d,0000000..64ee66e
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
@@@ -1,93 -1,0 +1,93 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Indicates to the server that a transaction is
 + * failing over to this server. The server then
 + * performs the necessary bootstrapping for the tx.
 + * @author sbawaska
 + * @since 6.6
 + */
 +public class TXFailoverOp {
 +
 +  public static void execute(ExecutablePool pool, int txId) {
 +    pool.execute(new TXFailoverOpImpl(txId));
 +  }
 +  
 +  private TXFailoverOp() {
 +    // no instance
 +  }
 +  
 +  private static class TXFailoverOpImpl extends AbstractOp {
 +    int txId;
 +
 +    protected TXFailoverOpImpl(int txId) {
 +      super(MessageType.TX_FAILOVER, 1);
 +      getMessage().setTransactionId(txId);
 +      this.txId = txId;
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return "TXFailoverOp(txId="+this.txId+")";
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "txFailover");
 +      return null;
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.EXCEPTION;
 +    }
 +
 +    @Override  
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startTxFailover();
 +    }
 +    @Override  
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endTxFailoverSend(start, hasFailed());
 +    }
 +    @Override  
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endTxFailover(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
index 48d66f2,0000000..34ecf4d
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
@@@ -1,163 -1,0 +1,163 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.GemFireException;
 +import com.gemstone.gemfire.cache.CommitConflictException;
 +import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.TXCommitMessage;
 +import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +
 +/**
 + * TXSynchronizationOp sends JTA beforeCompletion and afterCompletion
 + * messages to the server pool.
 + * 
 + * @author bruce
 + *
 + */
 +public class TXSynchronizationOp {
 +
 +  public static enum CompletionType {
 +    BEFORE_COMPLETION, AFTER_COMPLETION
 +  }
 +
 +  /**
 +   * @param pool
 +   * @param status - the status of an afterCompletion notification
 +   * @param txId - the transaction identifier
 +   * @param type - BEFORE_COMPLETION or AFTER_COMPLETION
 +   * @return the server's commit message
 +   */
 +  public static TXCommitMessage execute(InternalPool pool, int status, int txId, CompletionType type) {
 +    Impl impl = new Impl(status, txId, type);
 +    pool.execute(impl);
 +    return impl.tXCommitMessageResponse;
 +  }
 +  
 +  static class Impl extends AbstractOp {
 +
 +    private int status;
 +    private CompletionType type;
 +    TXCommitMessage tXCommitMessageResponse;
 +
 +    /**
 +     * @param status
 +     * @param type
 +     */
 +    public Impl(int status, int txId, CompletionType type) {
 +      super(MessageType.TX_SYNCHRONIZATION, (type==CompletionType.AFTER_COMPLETION)? 3 : 2);
 +      this.status = status;
 +      this.type = type;
 +      getMessage().addIntPart(type.ordinal());
 +      getMessage().addIntPart(txId);
 +      if (type == CompletionType.AFTER_COMPLETION) {
 +        getMessage().addIntPart(status);
 +      }
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return "TXSynchronization(threadTxId=" + TXManagerImpl.getCurrentTXUniqueId()
 +      +"; "+this.type + "; status=" + this.status + ")";
 +    }
 +
 +    @Override
 +  protected void processAck(Message msg, String opName)
 +    throws Exception
 +  {
 +    final int msgType = msg.getMessageType();
 +    if (msgType == MessageType.REPLY) {
 +      return;
 +    } else {
 +      Part part = msg.getPart(0);
 +      if (msgType == MessageType.EXCEPTION) {
 +        Throwable t = (Throwable) part.getObject();
 +        if (t instanceof CommitConflictException ||
 +            t instanceof SynchronizationCommitConflictException) {
 +          throw (GemFireException)t;
 +        }
 +      }
 +      super.processAck(msg, opName);
 +    }
 +  }
 +
 +    
 +    /* (non-Javadoc)
 +     * @see com.gemstone.gemfire.cache.client.internal.AbstractOp#processResponse(com.gemstone.gemfire.internal.cache.tier.sockets.Message)
 +     */
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      if (this.type == CompletionType.BEFORE_COMPLETION) {
 +        try {
 +          processAck(msg, type.toString());
 +        } catch (ServerOperationException e) {
 +          if (e.getCause() instanceof SynchronizationCommitConflictException) {
 +            throw (SynchronizationCommitConflictException)e.getCause();
 +          }
 +        }
 +        return null;
 +      } else {
 +        TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, this.type.toString());
 +        this.tXCommitMessageResponse = rcs;
 +        return rcs;
 +      }
 +    }
 +
 +    /* (non-Javadoc)
 +     * @see com.gemstone.gemfire.cache.client.internal.AbstractOp#isErrorResponse(int)
 +     */
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.REQUESTDATAERROR;
 +    }
 +
 +    @Override  
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startTxSynchronization();
 +    }
 +    @Override  
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endTxSynchronizationSend(start, hasFailed());
 +    }
 +    @Override  
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endTxSynchronization(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
index 636dd91,0000000..d2fdbe7
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
@@@ -1,357 -1,0 +1,357 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
 +
 +import java.io.IOException;
 +import java.util.regex.Matcher;
 +
++import com.gemstone.gemfire.internal.hll.ICardinality;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.BytesWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.compress.CompressionCodec;
 +import org.apache.hadoop.io.compress.GzipCodec;
 +import org.apache.hadoop.io.compress.Lz4Codec;
 +import org.apache.hadoop.io.compress.SnappyCodec;
 +
 +import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
 +import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
 +import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
 +import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
 +import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
 +import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import org.apache.hadoop.hbase.util.FSUtils;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +/**
 + * Abstract class for {@link Hoplog} with common functionality
 + */
 +public abstract class AbstractHoplog implements Hoplog {
 +  protected final FSProvider fsProvider;
 +  
 +  // path of the oplog file
 +  protected volatile Path path;
 +  private volatile HoplogDescriptor hfd;
 +  protected Configuration conf;
 +  protected SortedOplogStatistics stats;
 +  protected Long hoplogModificationTime;
 +  protected Long hoplogSize;
 +
 +  protected HoplogReaderActivityListener readerListener;
 +  
 +  // logger instance
 +  protected static final Logger logger = LogService.getLogger();
 +  
 +  protected static String logPrefix;
 +  // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
 +  AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
 +      throws IOException {
 +    logPrefix = "<" + filePath.getName() + "> ";
 +    this.fsProvider = new FSProvider(inputFS);
 +    initialize(filePath, stats, inputFS);
 +  }
 +
 +  public AbstractHoplog(HDFSStoreImpl store, Path filePath,
 +      SortedOplogStatistics stats) throws IOException {
 +    logPrefix = "<" + filePath.getName() + "> ";
 +    this.fsProvider = new FSProvider(store);
 +    initialize(filePath, stats, store.getFileSystem());
 +  }
 +
 +  private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
 +    this.conf = fs.getConf();
 +    this.stats = stats;
 +    this.path = fs.makeQualified(path);
 +    this.hfd = new HoplogDescriptor(this.path.getName());
 +  }
 +  
 +  @Override
 +  public abstract void close() throws IOException; 
 +  @Override
 +  public abstract HoplogReader getReader() throws IOException;
 +
 +  @Override
 +  public abstract HoplogWriter createWriter(int keys) throws IOException;
 +
 +  @Override
 +  abstract public void close(boolean clearCache) throws IOException;
 +
 +  @Override
 +  public void setReaderActivityListener(HoplogReaderActivityListener listener) {
 +    this.readerListener = listener;
 +  }
 +  
 +  @Override
 +  public String getFileName() {
 +    return this.hfd.getFileName();
 +  }
 +  
 +  public final int compareTo(Hoplog o) {
 +    return hfd.compareTo( ((AbstractHoplog)o).hfd);
 +  }
 +
 +  @Override
 +  public ICardinality getEntryCountEstimate() throws IOException {
 +    return null;
 +  }
 +  
 +  @Override
 +  public synchronized void rename(String name) throws IOException {
 +    if (logger.isDebugEnabled())
 +      logger.debug("{}Renaming hoplog to " + name, logPrefix);
 +    Path parent = path.getParent();
 +    Path newPath = new Path(parent, name);
 +    fsProvider.getFS().rename(path, new Path(parent, newPath));
 +
 +    // close the old reader and let the new one get created lazily
 +    close();
 +    
 +    // update path to point to the new path
 +    path = newPath;
 +    this.hfd = new HoplogDescriptor(this.path.getName());
 +    logPrefix = "<" + path.getName() + "> ";
 +  }
 +  
 +  @Override
 +  public synchronized void delete() throws IOException {
 +    if (logger.isDebugEnabled())
 +      logger.debug("{}Deleting hoplog", logPrefix);
 +    close();
 +    this.hoplogModificationTime = null;
 +    this.hoplogSize = null;
 +    fsProvider.getFS().delete(path, false);
 +  }
 +
 +  @Override
 +  public long getModificationTimeStamp() {
 +    initHoplogSizeTimeInfo();
 +
 +    // modification time will not be null if this hoplog is existing. Otherwise
 +    // invocation of this method should is invalid
 +    if (hoplogModificationTime == null) {
 +      throw new IllegalStateException();
 +    }
 +    
 +    return hoplogModificationTime;
 +  }
 +
 +  @Override
 +  public long getSize() {
 +    initHoplogSizeTimeInfo();
 +    
 +    // size will not be null if this hoplog is existing. Otherwise
 +    // invocation of this method should is invalid
 +    if (hoplogSize == null) {
 +      throw new IllegalStateException();
 +    }
 +    
 +    return hoplogSize;
 +  }
 +  
 +  private synchronized void initHoplogSizeTimeInfo() {
 +    if (hoplogSize != null && hoplogModificationTime != null) {
 +      // time and size info is already initialized. no work needed here
 +      return;
 +    }
 +
 +    try {
 +      FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
 +      if (filesInfo != null && filesInfo.length == 1) {
 +        this.hoplogModificationTime = filesInfo[0].getModificationTime();
 +        this.hoplogSize = filesInfo[0].getLen();
 +      }
 +      // TODO else condition may happen if user deletes hoplog from the file system.
 +    } catch (IOException e) {
 +      logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
 +      throw new HDFSIOException(
 +          LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
 +    }
 +  }
 +  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
 +      Configuration conf, Logger logger) throws IOException {
 +    return getSequenceFileWriter(path,conf, logger, null); 
 +  }
 +  
 +  /**
 +   * 
 +   * @param path
 +   * @param conf
 +   * @param logger
 +   * @param version - is being used only for testing. Should be passed as null for other purposes. 
 +   * @return SequenceFile.Writer 
 +   * @throws IOException
 +   */
 +  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
 +    Configuration conf, Logger logger, Version version) throws IOException {
 +    Option optPath = SequenceFile.Writer.file(path);
 +    Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
 +    Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
 +    Option optCom = withCompression(logger);
 +    if (logger.isDebugEnabled())
 +      logger.debug("{}Started creating hoplog " + path, logPrefix);
 +    
 +    if (version == null)
 +      version = Version.CURRENT;
 +    //Create a metadata option with the gemfire version, for future versioning
 +    //of the key and value format
 +    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
 +    metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
 +    Option optMeta = SequenceFile.Writer.metadata(metadata);
 +    
 +    SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
 +    
 +    return writer;
 +  }
 +  
 +  private static Option withCompression(Logger logger) {
 +    String prop = System.getProperty(HoplogConfig.COMPRESSION);
 +    if (prop != null) {
 +      CompressionCodec codec;
 +      if (prop.equalsIgnoreCase("SNAPPY")) {
 +        codec = new SnappyCodec();
 +      } else if (prop.equalsIgnoreCase("LZ4")) {
 +        codec = new Lz4Codec();
 +      } else if (prop.equals("GZ")) {
 +        codec = new GzipCodec();
 +      } else {
 +        throw new IllegalStateException("Unsupported codec: " + prop);
 +      }
 +      if (logger.isDebugEnabled())
 +        logger.debug("{}Using compression codec " + codec, logPrefix);
 +      return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
 +    }
 +    return SequenceFile.Writer.compression(CompressionType.NONE, null);
 +  }
 +  
 +  public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
 +     private final String fileName;
 +     private final String bucket;
 +     private final int sequence;
 +     private final long timestamp;
 +     private final String extension;
 +     
 +     HoplogDescriptor(final String fileName) {
 +       this.fileName = fileName;
 +       final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
 +       final boolean matched = matcher.find();
 +       assert matched;
 +       this.bucket = matcher.group(1);
 +       this.sequence = Integer.valueOf(matcher.group(3));
 +       this.timestamp = Long.valueOf(matcher.group(2)); 
 +       this.extension = matcher.group(4);
 +     }
 +     
 +     public final String getFileName() {
 +       return fileName;
 +     }
 +     
 +     @Override
 +     public boolean equals(Object o) {
 +       if (this == o) {
 +         return true;
 +       }
 +       
 +       if (!(o instanceof HoplogDescriptor)) {
 +         return false;
 +       }
 +       
 +       final HoplogDescriptor other = (HoplogDescriptor)o;
 +       // the two files should belong to same bucket
 +       assert this.bucket.equals(other.bucket);
 +       
 +       // compare sequence first
 +       if (this.sequence != other.sequence) {
 +         return false;
 +       }
 +       
 +       // sequence is same, compare timestamps
 +       if (this.timestamp != other.timestamp) {
 +         return false;
 +       }
 +       
 +       return extension.equals(other.extension);
 +     }
 +
 +    @Override
 +    public int compareTo(HoplogDescriptor o) {
 +      if (this == o) {
 +        return 0;
 +      }
 +      
 +      // the two files should belong to same bucket
 +      assert this.bucket.equals(o.bucket);
 +      
 +      // compare sequence first
 +      if (sequence > o.sequence) {
 +        return -1;
 +      } else if (sequence < o.sequence) {
 +        return 1;
 +      }
 +      
 +      // sequence is same, compare timestamps
 +      if(timestamp > o.timestamp) {
 +        return -1; 
 +      } else if (timestamp < o.timestamp) {
 +        return 1;
 +      }
 +      
 +      //timestamp is the same, compare the file extension. It's
 +      //possible a major compaction and minor compaction could finish
 +      //at the same time and create the same timestamp and sequence number
 +      //it doesn't matter which file we look at first in that case.
 +      return extension.compareTo(o.extension);
 +    }
 +     
 +     
 +  }
 +  
 +  protected static final class FSProvider {
 +    final FileSystem fs;
 +    final HDFSStoreImpl store;
 +    
 +    // THIS METHOD IS FOR TESTING ONLY
 +    FSProvider(FileSystem fs) {
 +      this.fs = fs;
 +      this.store = null;
 +    }
 +    
 +    FSProvider(HDFSStoreImpl store) {
 +      this.store = store;
 +      fs = null;
 +    }
 +    
 +    public FileSystem getFS() throws IOException {
 +      if (store != null) {
 +        return store.getFileSystem();
 +      }
 +      return fs;
 +    }
 +
 +    public FileSystem checkFileSystem() {
 +      store.checkAndClearFileSystem();
 +      return store.getCachedFileSystem();
 +    }
 +  }
 +}