You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:38 UTC

[050/100] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
index 0ae0437,0000000..28c1e0f
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
@@@ -1,436 -1,0 +1,436 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.DataInputStream;
 +import java.net.SocketTimeoutException;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerConnectivityException;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.distributed.internal.DistributionManager;
 +import com.gemstone.gemfire.internal.HeapDataOutputStream;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.cache.CacheServerImpl;
 +import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
 +import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 +
 +/**
 + * Represents an operation that can be performed in a client by sending
 + * a message to a server.
 + * @since 5.7
 + */
 +public abstract class AbstractOp implements Op {
 +  
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  private final Message msg;
 +
 +  protected AbstractOp(int msgType, int msgParts) {
 +    this.msg = new Message(msgParts, Version.CURRENT);
 +    getMessage().setMessageType(msgType);
 +  }
 +
 +  /**
 +   * Returns the message that this op will send to the server
 +   */
 +  protected Message getMessage() {
 +    return this.msg;
 +  }
 +  protected void initMessagePart() {
 +    
 +  }
 +  /**
 +   * Sets the transaction id on the message
 +   */
 +  private void setMsgTransactionId() {
 +    if (participateInTransaction()
 +        && getMessage().getTransactionId() == TXManagerImpl.NOTX) {
 +      getMessage().setTransactionId(TXManagerImpl.getCurrentTXUniqueId());
 +    }
 +  }
 +
 +  /**
 +   * Attempts to send this operation's message out on the
 +   * given connection
 +   * @param cnx the connection to use when sending
 +   * @throws Exception if the send fails
 +   */
 +  protected void attemptSend(Connection cnx) throws Exception {
 +    setMsgTransactionId();
 +    if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Sending op={} using {}", getShortClassName(), cnx);
 +      }
 +    }
 +    getMessage().setComms(cnx.getSocket(), cnx.getInputStream(),
 +        cnx.getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
 +    try {
 +      sendMessage(cnx);
 +    } finally {
 +      getMessage().unsetComms();
 +    }
 +  }
 +
 +  /** returns the class name w/o package information.  useful in logging */
 +  public String getShortClassName() {
 +    String cname = getClass().getName();
 +    return cname.substring(getClass().getPackage().getName().length()+1);
 +  }
 +
 +  /**
 +   * New implementations of AbstractOp should override this method if the
 +   * implementation should be excluded from client authentication. e.g.
 +   * PingOp#sendMessage(Connection cnx)
 +   * 
 +   * @see AbstractOp#needsUserId()
 +   * @see AbstractOp#processSecureBytes(Connection, Message)
 +   * @see ServerConnection#updateAndGetSecurityPart()
 +   */
 +  protected void sendMessage(Connection cnx) throws Exception {
 +    if (cnx.getServer().getRequiresCredentials()) {
 +      // Security is enabled on client as well as on server
-       getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++      getMessage().setMessageHasSecurePartFlag();
 +      HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
 +      long userId = -1;
 +
 +      if (UserAttributes.userAttributes.get() == null) { // single user mode
 +        userId = cnx.getServer().getUserId();
 +      } else { // multi user mode
 +        Object id = UserAttributes.userAttributes.get().getServerToId().get(
 +            cnx.getServer());
 +        if (id == null) {
 +          // This will ensure that this op is retried on another server, unless
 +          // the retryCount is exhausted. Fix for Bug 41501
 +          throw new ServerConnectivityException(
 +              "Connection error while authenticating user"); // TODO:LOG hdos is not closed??
 +        }
 +        userId = (Long)id;
 +      }
 +      try {
 +        hdos.writeLong(cnx.getConnectionID());
 +        hdos.writeLong(userId);
 +        getMessage().setSecurePart(
 +            ((ConnectionImpl)cnx).getHandShake().encryptBytes(
 +                hdos.toByteArray()));
 +      } finally {
 +        hdos.close();
 +      }
 +    }
 +    getMessage().send(false);
 +  }
 +
 +  /**
 +   * Attempts to read a response to this operation by reading it from the
 +   * given connection, and returning it.
 +   * @param cnx the connection to read the response from
 +   * @return the result of the operation
 +   *         or <code>null</code> if the operation has no result.
 +   * @throws Exception if the execute failed
 +   */
 +  protected Object attemptReadResponse(Connection cnx) throws Exception {
 +    Message msg = createResponseMessage();
 +    if (msg != null) {
 +      msg.setComms(cnx.getSocket(), cnx.getInputStream(),
 +          cnx.getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
 +      if (msg instanceof ChunkedMessage) {
 +        try {
 +          return processResponse(msg, cnx);
 +        } finally {
 +          msg.unsetComms();
 +          // TODO (ashetkar) Handle the case when we fail to read the connection id.
 +          processSecureBytes(cnx, msg);
 +        }
 +      } else {
 +        try {
 +          msg.recv();
 +        } finally {
 +          msg.unsetComms();
 +          processSecureBytes(cnx, msg);
 +        }
 +        return processResponse(msg, cnx);
 +      }
 +    } else {
 +      return null;
 +    }
 +  }
 +
 +  /**
 +   * New implementations of AbstractOp should override this method if the
 +   * implementation should be excluded from client authentication. e.g.
 +   * PingOp#processSecureBytes(Connection cnx, Message message)
 +   * 
 +   * @see AbstractOp#sendMessage(Connection)
 +   * @see AbstractOp#needsUserId()
 +   * @see ServerConnection#updateAndGetSecurityPart()
 +   */
 +  protected void processSecureBytes(Connection cnx, Message message)
 +      throws Exception {
 +    if (cnx.getServer().getRequiresCredentials()) {
 +      if (!message.isSecureMode()) {
 +        // This can be seen during shutdown
 +        if (logger.isDebugEnabled()) {
 +          logger.trace(LogMarker.BRIDGE_SERVER, "Response message from {} for {} has no secure part.", cnx, this);
 +        }
 +        return;
 +      }
 +      byte[] partBytes = message.getSecureBytes();
 +      if (partBytes == null) {
 +        if (logger.isDebugEnabled()) {
 +          logger.debug("Response message for {} has no bytes in secure part.", this);
 +        }
 +        return;
 +      }
 +      byte[] bytes = ((ConnectionImpl)cnx).getHandShake().decryptBytes(
 +          partBytes);
 +      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
 +      cnx.setConnectionID(dis.readLong());
 +    }
 +  }
 +
 +  /**
 +   * By default just create a normal one part msg.
 +   * Subclasses can override this.
 +   */
 +  protected Message createResponseMessage() {
 +    return new Message(1, Version.CURRENT);
 +  }
 +  
 +  protected Object processResponse(Message m, Connection con) throws Exception {
 +    return processResponse(m);
 +  }
 +  
 +  /**
 +   * Processes the given response message returning the result, if any,
 +   * of the processing.
 +   * @return the result of processing the response; null if no result
 +   * @throws Exception if response could not be processed or
 +   * we received a response with a server exception.
 +   */
 +  protected abstract Object processResponse(Message msg) throws Exception;
 +
 +  /**
 +   * Return true of <code>msgType</code> indicates the operation
 +   * had an error on the server.
 +   */
 +  protected abstract boolean isErrorResponse(int msgType);
 +  /**
 +   * Process a response that contains an ack.
 +   * @param msg the message containing the response
 +   * @param opName text describing this op
 +   * @throws Exception if response could not be processed or
 +   * we received a response with a server exception.
 +   */
 +  protected void processAck(Message msg, String opName)
 +    throws Exception
 +  {
 +    final int msgType = msg.getMessageType();
 +    if (msgType == MessageType.REPLY) {
 +      return;
 +    } else {
 +      Part part = msg.getPart(0);
 +      if (msgType == MessageType.EXCEPTION) {
 +        String s = ": While performing a remote " + opName;
 +        Throwable t = (Throwable) part.getObject();
 +        if (t instanceof PutAllPartialResultException) {
 +          throw (PutAllPartialResultException)t;
 +        } else {
 +          throw new ServerOperationException(s, t);
 +        }
 +        // Get the exception toString part.
 +        // This was added for c++ thin client and not used in java
 +        // Part exceptionToStringPart = msg.getPart(1);
 +      } else if (isErrorResponse(msgType)) {
 +        throw new ServerOperationException(part.getString());
 +      } else {
 +        throw new InternalGemFireError("Unexpected message type "
 +                                       + MessageType.getString(msgType));
 +      }
 +    }
 +  }
 +  /**
 +   * Process a response that contains a single Object result.
 +   * @param msg the message containing the response
 +   * @param opName text describing this op
 +   * @return the result of the response
 +   * @throws Exception if response could not be processed or
 +   * we received a response with a server exception.
 +   */
 +  protected final Object processObjResponse(Message msg, String opName)
 +    throws Exception
 +  {
 +    Part part = msg.getPart(0);
 +    final int msgType = msg.getMessageType();
 +    if (msgType == MessageType.RESPONSE) {
 +      return part.getObject();
 +    } else {
 +      if (msgType == MessageType.EXCEPTION) {
 +        String s = "While performing a remote " + opName;
 +        throw new ServerOperationException(s, (Throwable) part.getObject());
 +        // Get the exception toString part.
 +        // This was added for c++ thin client and not used in java
 +        // Part exceptionToStringPart = msg.getPart(1);
 +      } else if (isErrorResponse(msgType)) {
 +        throw new ServerOperationException(part.getString());
 +      } else {
 +        throw new InternalGemFireError("Unexpected message type "
 +                                       + MessageType.getString(msgType));
 +      }
 +    }
 +  }
 +  /**
 +   * Used by subclasses who get chunked responses.
 +   */
 +  public interface ChunkHandler {
 +    /**
 +     * This method will be called once for every incoming chunk
 +     * @param msg the current chunk to handle
 +     */
 +    public void handle(ChunkedMessage msg) throws Exception;
 +  }
 +  /**
 +   * Process a chunked response that contains a single Object result.
 +   * @param msg the message containing the response
 +   * @param opName text describing this op
 +   * @param callback used to handle each chunks data
 +   * @throws Exception if response could not be processed or
 +   * we received a response with a server exception.
 +   */
 +  protected final void processChunkedResponse(ChunkedMessage msg, String opName, ChunkHandler callback)
 +    throws Exception
 +  {
 +    msg.readHeader();
 +    final int msgType = msg.getMessageType();
 +    if (msgType == MessageType.RESPONSE) {
 +      do {
 +        msg.receiveChunk();
 +        callback.handle(msg);
 +      } while (!msg.isLastChunk());
 +    } else {
 +      if (msgType == MessageType.EXCEPTION) {
 +        msg.receiveChunk();
 +        Part part = msg.getPart(0);
 +        String s = "While performing a remote " + opName;
 +        throw new ServerOperationException(s, (Throwable) part.getObject());
 +        // Get the exception toString part.
 +        // This was added for c++ thin client and not used in java
 +        // Part exceptionToStringPart = msg.getPart(1);
 +      } else if (isErrorResponse(msgType)) {
 +        msg.receiveChunk();
 +        Part part = msg.getPart(0);
 +        throw new ServerOperationException(part.getString());
 +      } else {
 +        throw new InternalGemFireError("Unexpected message type "
 +                                       + MessageType.getString(msgType));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Set to true if this attempt failed
 +   */
 +  protected boolean failed;
 +  /**
 +   * Set to true if this attempt timed out
 +   */
 +  protected boolean timedOut;
 +
 +  /* (non-Javadoc)
 +   * @see com.gemstone.gemfire.cache.client.internal.Op#attempt(com.gemstone.gemfire.cache.client.internal.Connection)
 +   */
 +  public Object attempt(Connection cnx) throws Exception {
 +    this.failed = true;
 +    this.timedOut = false;
 +    long start = startAttempt(cnx.getStats());
 +    try {
 +      try {
 +        attemptSend(cnx);
 +        this.failed = false;
 +      } finally {
 +        endSendAttempt(cnx.getStats(), start);
 +      }
 +      this.failed = true;
 +      try {
 +        Object result = attemptReadResponse(cnx);
 +        this.failed = false;
 +        return result;
 +      } catch (SocketTimeoutException ste) {
 +        this.failed = false;
 +        this.timedOut = true;
 +        throw ste;
 +      } catch(Exception e) {
 +        throw e;
 +      }
 +    } finally {
 +      endAttempt(cnx.getStats(), start);
 +    }
 +  }
 +  protected final boolean hasFailed() {
 +    return this.failed;
 +  }
 +  protected final boolean hasTimedOut() {
 +    return this.timedOut;
 +  }
 +  protected abstract long startAttempt(ConnectionStats stats);
 +  protected abstract void endSendAttempt(ConnectionStats stats, long start);
 +  protected abstract void endAttempt(ConnectionStats stats, long start);
 +
 +  /**
 +   * New implementations of AbstractOp should override this method to return
 +   * false if the implementation should be excluded from client authentication.
 +   * e.g. PingOp#needsUserId()
 +   * <P/>
 +   * Also, such an operation's <code>MessageType</code> must be added in the
 +   * 'if' condition in {@link ServerConnection#updateAndGetSecurityPart()}
 +   * 
 +   * @return boolean
 +   * @see AbstractOp#sendMessage(Connection)
 +   * @see AbstractOp#processSecureBytes(Connection, Message)
 +   * @see ServerConnection#updateAndGetSecurityPart()
 +   */
 +  protected boolean needsUserId() {
 +    return true;
 +  }
 +  
 +  /**
 +   * Subclasses for AbstractOp should override this method to return
 +   * false in this message should not participate in any existing transaction
 +   * @return true if the message should participate in transaction
 +   */
 +  protected boolean participateInTransaction() {
 +    return true;
 +  }
 +
 +  @Override
 +  public boolean useThreadLocalConnection() {
 +    return true;
 +  }
 +  
 +  public boolean isGatewaySenderOp() {
 +    return false;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
index b88948d,0000000..ff2bf1c
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
@@@ -1,98 -1,0 +1,98 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.pdx.internal.EnumInfo;
 +
 +/**
 + * Push a PDX Enum id to other servers.
 + * @author darrel
 + * @since 6.6.2
 + */
 +public class AddPDXEnumOp {
 +  /**
 +   * Register a bunch of instantiators on a server
 +   * using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static void execute(ExecutablePool pool, int id,
 +                             EnumInfo ei)
 +  {
 +    AbstractOp op = new AddPdxEnumOpImpl(id, ei);
 +    pool.execute(op);;
 +  }
 +                                                               
 +  private AddPDXEnumOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class AddPdxEnumOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public AddPdxEnumOpImpl(int id, EnumInfo ei) {
 +      super(MessageType.ADD_PDX_ENUM, 2);
 +      getMessage().addObjPart(ei);
 +      getMessage().addIntPart(id);
 +    }
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "addPDXEnum");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startAddPdxType(); /* use the addPdxType stats instead of adding more stats */
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endAddPdxTypeSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endAddPdxType(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    //Don't send the transaction id for this message type.
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
 +    
 +    //TODO - no idea what this mumbo jumbo means, but it's on
 +    //most of the other messages like this.
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
index 92dd246,0000000..9fbc674
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
@@@ -1,98 -1,0 +1,96 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.pdx.internal.PdxType;
 +
 +/**
 + * Add a PdxType to a server.
 + * @author dsmith
 + * @since 6.6
 + */
 +public class AddPDXTypeOp {
 +  /**
 +   * Register a bunch of instantiators on a server
 +   * using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static void execute(ExecutablePool pool, int id,
 +                             PdxType type)
 +  {
 +    AbstractOp op = new AddPDXTypeOpImpl(id, type);
 +    pool.execute(op);
 +  }
 +                                                               
 +  private AddPDXTypeOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class AddPDXTypeOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public AddPDXTypeOpImpl(int id, PdxType type) {
 +      super(MessageType.ADD_PDX_TYPE, 2);
 +      getMessage().addObjPart(type);
 +      getMessage().addIntPart(id);
 +    }
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      processAck(msg, "addPDXType");
 +      return null;
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startAddPdxType();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endAddPdxTypeSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endAddPdxType(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    //Don't send the transaction id for this message type.
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
 +    
-     //TODO - no idea what this mumbo jumbo means, but it's on
-     //most of the other messages like this.
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
index b0f0cec,0000000..b03c7b9
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
@@@ -1,312 -1,0 +1,312 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +/**
 + * 
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.DataInputStream;
 +import java.util.Properties;
 +
 +import com.gemstone.gemfire.DataSerializer;
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.DistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.ServerLocation;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.internal.HeapDataOutputStream;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.command.PutUserCredentials;
 +import com.gemstone.gemfire.internal.logging.InternalLogWriter;
 +import com.gemstone.gemfire.security.AuthenticationFailedException;
 +import com.gemstone.gemfire.security.AuthenticationRequiredException;
 +import com.gemstone.gemfire.security.NotAuthorizedException;
 +
 +/**
 + * Authenticates this client (or a user) on a server. This op ideally should get
 + * executed once-per-server.
 + * 
 + * When multiuser-authentication is set to false, this op gets executed
 + * immedialtely after a client-to-server connection is established.
 + * 
 + * When multiuser-authentication is set to true, this op gets executed
 + * before the user attempts to perform an op whose
 + * {@link AbstractOp#needsUserId()} returns true.
 + * 
 + * @author ashetkar
 + * @see PutUserCredentials
 + * @see ProxyCache
 + * @since 6.5
 + */
 +public class AuthenticateUserOp {
 +
 +  /**
 +   * Sends the auth credentials to the server. Used in single user mode of
 +   * authentication.
 +   * 
 +   * @param con
 +   *          The connection to use for this operation.
 +   * @param pool
 +   *          The connection pool to use for this operation.
 +   * @return Object unique user-id.
 +   */
 +  public static Object executeOn(Connection con, ExecutablePool pool) {
 +    AbstractOp op = new AuthenticateUserOpImpl(con, pool);
 +    return pool.executeOn(con, op);
 +  }
 +
 +  /**
 +   * Sends the auth credentials to the server for a particular user. Used in
 +   * multiple user mode of authentication.
 +   * 
 +   * @param location
 +   *          The ServerLocation instance whose connection instance will be used
 +   *          to perform the operation.
 +   * @param pool
 +   *          The connection pool to use for this operation.
 +   * @param securityProps
 +   * @return Object unique user-id.
 +   */
 +  public static Object executeOn(ServerLocation location, ExecutablePool pool,
 +      Properties securityProps) {
 +    AbstractOp op = new AuthenticateUserOpImpl(pool, securityProps);
 +    return pool.executeOn(location, op);
 +  }
 +
 +  private AuthenticateUserOp() {
 +    // no instances allowed
 +  }
 +
 +  static class AuthenticateUserOpImpl extends AbstractOp {
 +
 +    private Properties securityProperties = null;
 +    private boolean needsServerLocation = false;
 +
 +    public AuthenticateUserOpImpl(Connection con, ExecutablePool pool) {
 +      super(MessageType.USER_CREDENTIAL_MESSAGE, 1);
 +      byte[] credentialBytes = null;
 +      // TODO this is not a valid way to create a member ID
 +      DistributedMember server = new InternalDistributedMember(con.getSocket()
 +          .getInetAddress(), con.getSocket().getPort(), false);
 +      DistributedSystem sys = InternalDistributedSystem.getConnectedInstance();
 +      String authInitMethod = sys.getProperties().getProperty(
 +          DistributionConfig.SECURITY_CLIENT_AUTH_INIT_NAME);
 +      Properties tmpSecurityProperties = sys.getSecurityProperties();
 +
 +      // LOG: following passes the DS API LogWriters into the security API
 +      Properties credentials = HandShake.getCredentials(authInitMethod,
 +          tmpSecurityProperties, server, false, (InternalLogWriter)sys.getLogWriter(), (InternalLogWriter)sys
 +              .getSecurityLogWriter());
 +      
-       getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++      getMessage().setMessageHasSecurePartFlag();
 +      HeapDataOutputStream heapdos = new HeapDataOutputStream(Version.CURRENT);
 +      try {
 +        DataSerializer.writeProperties(credentials, heapdos);
 +        credentialBytes = ((ConnectionImpl)con).getHandShake()
 +            .encryptBytes(heapdos.toByteArray());
 +      } catch (Exception e) {
 +        throw new ServerOperationException(e);
 +      } finally {
 +        heapdos.close();
 +      }
 +      getMessage().addBytesPart(credentialBytes);
 +    }
 +
 +    public AuthenticateUserOpImpl(ExecutablePool pool, Properties securityProps) {
 +      this(pool, securityProps, false);
 +    }
 +
 +    public AuthenticateUserOpImpl(ExecutablePool pool, Properties securityProps, boolean needsServer) {
 +      super(MessageType.USER_CREDENTIAL_MESSAGE, 1);
 +      this.securityProperties = securityProps;
 +      this.needsServerLocation = needsServer;
 +
-       getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++      getMessage().setMessageHasSecurePartFlag();
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
 +      HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
 +      byte[] secureBytes = null;
 +      hdos.writeLong(cnx.getConnectionID());
 +      if (this.securityProperties != null) {
 +        byte[] credentialBytes = null;
 +        // TODO this is not a valid way to create a member ID
 +        DistributedMember server = new InternalDistributedMember(cnx
 +            .getSocket().getInetAddress(), cnx.getSocket().getPort(), false);
 +        DistributedSystem sys = InternalDistributedSystem
 +            .getConnectedInstance();
 +        String authInitMethod = sys.getProperties().getProperty(
 +            DistributionConfig.SECURITY_CLIENT_AUTH_INIT_NAME);
 +
 +        Properties credentials = HandShake.getCredentials(authInitMethod,
 +            this.securityProperties, server, false, (InternalLogWriter)sys.getLogWriter(), (InternalLogWriter)sys
 +                .getSecurityLogWriter());
 +        HeapDataOutputStream heapdos = new HeapDataOutputStream(Version.CURRENT);
 +        try {
 +          DataSerializer.writeProperties(credentials, heapdos);
 +          credentialBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
 +              heapdos.toByteArray());
 +        } finally {
 +          heapdos.close();
 +        }
 +        getMessage().addBytesPart(credentialBytes);
 +      }
 +      try {
 +        secureBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
 +            hdos.toByteArray());
 +      } finally {
 +        hdos.close();
 +      }
 +      getMessage().setSecurePart(secureBytes);
 +      getMessage().send(false);
 +    }
 +
 +    @Override
 +    public Object attempt(Connection cnx) throws Exception {
 +      if (cnx.getServer().getRequiresCredentials()) {
 +        return super.attempt(cnx);
 +      } else {
 +        return null;
 +      }
 +    }
 +
 +    @Override
 +    protected Object attemptReadResponse(Connection cnx) throws Exception {
 +      Message msg = createResponseMessage();
 +      if (msg != null) {
 +        msg.setComms(cnx.getSocket(), cnx.getInputStream(),
 +            cnx.getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
 +        if (msg instanceof ChunkedMessage) {
 +          try {
 +            return processResponse(cnx, msg);
 +          } finally {
 +            msg.unsetComms();
 +            processSecureBytes(cnx, msg);
 +          }
 +        } else {
 +          try {
 +            msg.recv();
 +          } finally {
 +            msg.unsetComms();
 +            processSecureBytes(cnx, msg);
 +          }
 +          return processResponse(cnx, msg);
 +        }
 +      } else {
 +        return null;
 +      }
 +    }
 +
 +    protected Object processResponse(Connection cnx, Message msg) throws Exception {
 +      byte[] bytes = null;
 +      Part part = msg.getPart(0);
 +      final int msgType = msg.getMessageType();
 +      long userId = -1;
 +      if (msgType == MessageType.RESPONSE) {
 +        bytes = (byte[])part.getObject();
 +        if (bytes.length == 0) {
 +          cnx.getServer().setRequiresCredentials(false);
 +        } else {
 +          cnx.getServer().setRequiresCredentials(true);
 +          byte[] decrypted = ((ConnectionImpl)cnx).getHandShake().decryptBytes(bytes);
 +          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(decrypted));
 +          userId = dis.readLong();
 +        }
 +        if (this.needsServerLocation) {
 +          return new Object[] {cnx.getServer(), userId};
 +       } else {
 +         return userId;
 +       }
 +      }
 +      else if (msgType == MessageType.EXCEPTION) {
 +        Object result = part.getObject();
 +        String s = "While performing a remote authenticate";
 +        if (result instanceof AuthenticationFailedException) {
 +          final AuthenticationFailedException afe =
 +            (AuthenticationFailedException)result;
 +          if ("REPLY_REFUSED".equals(afe.getMessage())) {
 +            throw new AuthenticationFailedException(s, afe.getCause());
 +          }
 +          else {
 +            throw new AuthenticationFailedException(s, afe);
 +          }
 +        }
 +        else if (result instanceof AuthenticationRequiredException) {
 +          throw new AuthenticationRequiredException(s,
 +              (AuthenticationRequiredException)result);
 +        }
 +        else if (result instanceof NotAuthorizedException) {
 +          throw new NotAuthorizedException(s, (NotAuthorizedException)result);
 +        }
 +        else {
 +          throw new ServerOperationException(s, (Throwable)result);
 +        }
 +        // Get the exception toString part.
 +        // This was added for c++ thin client and not used in java
 +        // Part exceptionToStringPart = msg.getPart(1);
 +      }
 +      else if (isErrorResponse(msgType)) {
 +        throw new ServerOperationException(part.getString());
 +      }
 +      else {
 +        throw new InternalGemFireError("Unexpected message type "
 +            + MessageType.getString(msgType));
 +      }
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.REQUESTDATAERROR;
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGet();
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetSend(start, hasFailed());
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGet(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      return null;
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
index 943f0a3,0000000..7de38fe
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
@@@ -1,95 -1,0 +1,95 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Tell a server that a connection is being closed
 + * @author darrel
 + * @since 5.7
 + */
 +public class CloseConnectionOp {
 +  /**
 +   * Tell a server that a connection is being closed
 +   * @param con the connection that is being closed
 +   * @param keepAlive whether to keep the proxy alive on the server
 +   */
 +  public static void execute(Connection con, boolean keepAlive)
 +    throws Exception
 +  {
 +    AbstractOp op = new CloseConnectionOpImpl(keepAlive);
 +    con.execute(op);
 +  }
 +                                                               
 +  private CloseConnectionOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class CloseConnectionOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public CloseConnectionOpImpl(boolean keepAlive)  {
 +      super(MessageType.CLOSE_CONNECTION, 1);
 +      getMessage().addRawPart(new byte[]{(byte)(keepAlive?1:0)}, false);
 +    }
 +    @Override  
 +    protected Message createResponseMessage() {
 +      // no response is sent
 +      return null;
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @Override  
 +    protected Object processResponse(Message msg) throws Exception {
 +      throw new IllegalStateException("should never be called");
 +    }
 +    @Override  
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override  
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startCloseCon();
 +    }
 +    @Override  
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endCloseConSend(start, hasFailed());
 +    }
 +    @Override  
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endCloseCon(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
index e04a466,0000000..c9c6dd7
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
@@@ -1,109 -1,0 +1,109 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.TXCommitMessage;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +/**
 + * Does a commit on a server
 + * @author gregp
 + * @since 6.6
 + */
 +public class CommitOp {
 +  /**
 +   * Does a commit on a server using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static TXCommitMessage execute(ExecutablePool pool,int txId)
 +  {
 +    CommitOpImpl op = new CommitOpImpl(txId);
 +    pool.execute(op);
 +    return op.getTXCommitMessageResponse();
 +  }
 +                                                               
 +  private CommitOp() {
 +    // no instances allowed
 +  }
 +  
 +    
 +  private static class CommitOpImpl extends AbstractOp {
 +    private int txId;
 +    
 +    private TXCommitMessage tXCommitMessageResponse = null;
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public CommitOpImpl(int txId) {
 +      super(MessageType.COMMIT, 1);
 +      getMessage().setTransactionId(txId);
 +      this.txId = txId;
 +    }
 +
 +    public TXCommitMessage getTXCommitMessageResponse() {
 +      return tXCommitMessageResponse;
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return "TXCommit(txId="+this.txId+")";
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, "commit");
 +      assert rcs != null : "TxCommit response was null";
 +      this.tXCommitMessageResponse = rcs;
 +      return rcs;
 +    }
 +     
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }    
 +    
 +    @Override  
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.EXCEPTION;
 +    }
 +    @Override  
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startCommit();
 +    }
 +    @Override  
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endCommitSend(start, hasFailed());
 +    }
 +    @Override  
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endCommit(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
index f88d1e9,0000000..8bae6ff
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
@@@ -1,171 -1,0 +1,171 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +
 +/**
 + * Retrieves {@link ClientPartitionAdvisor} for the specified PartitionedRegion from
 + * one of the servers
 + * 
 + * @author Suranjan Kumar
 + * @author Yogesh Mahajan
 + * 
 + * @since 6.5
 + */
 +public class GetClientPRMetaDataOp {
 +
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  private GetClientPRMetaDataOp() {
 +    // no instances allowed
 +  }
 +
 +  public static void execute(ExecutablePool pool, String regionFullPath,
 +      ClientMetadataService cms) {
 +    AbstractOp op = new GetClientPRMetaDataOpImpl(regionFullPath, cms);
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("GetClientPRMetaDataOp#execute : Sending GetClientPRMetaDataOp Message: {} to server using pool: {}", op.getMessage(), pool);
 +    }
 +    pool.execute(op);
 +  }
 +
 +  static class GetClientPRMetaDataOpImpl extends AbstractOp {
 +
 +    String regionFullPath = null;
 +
 +    ClientMetadataService cms = null;
 +
 +    public GetClientPRMetaDataOpImpl(String regionFullPath, ClientMetadataService cms) {
 +      super(MessageType.GET_CLIENT_PR_METADATA, 1);
 +      this.regionFullPath = regionFullPath;
 +      this.cms = cms;
 +      getMessage().addStringPart(regionFullPath);
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      switch (msg.getMessageType()) {
 +        case MessageType.GET_CLIENT_PR_METADATA_ERROR:
 +          String errorMsg = msg.getPart(0).getString();
 +          if (logger.isDebugEnabled()) {
 +            logger.debug(errorMsg);
 +          }
 +          throw new ServerOperationException(errorMsg);
 +        case MessageType.RESPONSE_CLIENT_PR_METADATA:
 +          final boolean isDebugEnabled = logger.isDebugEnabled();
 +          if (isDebugEnabled) {
 +            logger.debug("GetClientPRMetaDataOpImpl#processResponse: received message of type : {}" + MessageType.getString(msg.getMessageType()));
 +          }
 +          int numParts = msg.getNumberOfParts();
 +          ClientPartitionAdvisor advisor = cms
 +              .getClientPartitionAdvisor(regionFullPath);
 +          for (int i = 0; i < numParts; i++) {
 +            Object result = msg.getPart(i).getObject();
 +            List<BucketServerLocation66> locations = (List<BucketServerLocation66>)result;
 +          if (!locations.isEmpty()) {
 +            int bucketId = locations.get(0).getBucketId();
 +            if (isDebugEnabled) {
 +              logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations);
 +            }
 +            advisor.updateBucketServerLocations(bucketId, locations, cms);
 +            
 +            Set<ClientPartitionAdvisor> cpas = cms
 +                .getColocatedClientPartitionAdvisor(regionFullPath);
 +            if (cpas != null && !cpas.isEmpty()) {
 +              for (ClientPartitionAdvisor colCPA : cpas) {
 +                colCPA.updateBucketServerLocations(bucketId, locations, cms);
 +              }
 +            }
 +          }
 +          }
 +          if (isDebugEnabled) {
 +            logger.debug("GetClientPRMetaDataOpImpl#processResponse: received ClientPRMetadata from server successfully.");
 +          }
 +          cms.setMetadataStable(true);
 +          return null;
 +        case MessageType.EXCEPTION:
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("GetClientPRMetaDataOpImpl#processResponse: received message of type EXCEPTION");
 +          }
 +          Part part = msg.getPart(0);
 +          Object obj = part.getObject();
 +          String s = "While performing  GetClientPRMetaDataOp "
 +              + ((Throwable)obj).getMessage();
 +          throw new ServerOperationException(s, (Throwable)obj);
 +        default:
 +          throw new InternalGemFireError(
 +              LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
 +                  .toLocalizedString(Integer.valueOf(msg.getMessageType())));
 +      }
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGetClientPRMetadata();
 +    }
 +
 +    protected String getOpName() {
 +      return "GetClientPRMetaDataOp";
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetClientPRMetadataSend(start, hasFailed());
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGetClientPRMetadata(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
index 004872c,0000000..e1a8870
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
@@@ -1,177 -1,0 +1,177 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.util.Set;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.FixedPartitionAttributes;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +
 +/**
 + * 
 + * Retrieves {@link ClientPartitionAdvisor} related information for the
 + * specified PartitionedRegion from one of the servers
 + * 
 + * @author Suranjan Kumar
 + * @author Yogesh Mahajan
 + * 
 + * @since 6.5
 + * 
 + */
 +public class GetClientPartitionAttributesOp {
 +
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  private GetClientPartitionAttributesOp() {
 +    // no instances allowed
 +  }
 +
 +  @SuppressWarnings("unchecked")
 +  public static ClientPartitionAdvisor execute(ExecutablePool pool, String regionFullPath) {
 +    AbstractOp op = new GetClientPartitionAttributesOpImpl(regionFullPath);
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("GetClientPartitionAttributesOp#execute : Sending GetClientPartitionAttributesOp Message: {} for region: {} to server using pool: {}", op.getMessage(), regionFullPath, pool);
 +    }
 +    
 +    ClientPartitionAdvisor advisor = (ClientPartitionAdvisor)pool.execute(op);
 +
 +    if (advisor != null) {
 +      advisor.setServerGroup(((PoolImpl)pool).getServerGroup());
 +    }
 +    
 +    return advisor;
 +  }
 +
 +  static class GetClientPartitionAttributesOpImpl extends AbstractOp {
 +
 +    String regionFullPath = null;
 +
 +    public GetClientPartitionAttributesOpImpl(String regionFullPath) {
 +      super(MessageType.GET_CLIENT_PARTITION_ATTRIBUTES, 1);
 +      this.regionFullPath = regionFullPath;
 +      getMessage().addStringPart(regionFullPath);
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      switch (msg.getMessageType()) {
 +        case MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR:
 +          String errorMsg = msg.getPart(0).getString();
 +          if (logger.isDebugEnabled()) {
 +            logger.debug(errorMsg);
 +          }
 +          throw new ServerOperationException(errorMsg);
 +        case MessageType.RESPONSE_CLIENT_PARTITION_ATTRIBUTES:
 +          final boolean isDebugEnabled = logger.isDebugEnabled();
 +          if (isDebugEnabled) {
 +            logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received message of type : {}", MessageType.getString(msg.getMessageType()));
 +          }
 +          int bucketCount;
 +          String colocatedWith;
 +          String partitionResolverName = null;
 +          Set<FixedPartitionAttributes> fpaSet = null; 
 +          bucketCount = (Integer)msg.getPart(0).getObject();
 +          colocatedWith = (String)msg.getPart(1).getObject();
 +          if (msg.getNumberOfParts() == 4) {
 +            partitionResolverName = (String)msg.getPart(2).getObject();
 +            fpaSet = (Set<FixedPartitionAttributes>)msg.getPart(3).getObject();
 +          }
 +          else if (msg.getNumberOfParts() == 3) {
 +            Object obj = msg.getPart(2).getObject();
 +            if(obj instanceof String){
 +              partitionResolverName = (String)obj;
 +            }else{
 +              fpaSet = (Set<FixedPartitionAttributes>)obj;
 +            }
 +          }
 +          else if(bucketCount==-1){              
 +              return null;
 +          }
 +          if (isDebugEnabled) {
 +            logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received all the results from server successfully.");
 +          }
 +          ClientPartitionAdvisor advisor = new ClientPartitionAdvisor(bucketCount, colocatedWith,
 +                partitionResolverName, fpaSet);
 +          return advisor;
 +
 +        case MessageType.EXCEPTION:
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received message of type EXCEPTION");
 +          }
 +          Part part = msg.getPart(0);
 +          Object obj = part.getObject();
 +          String s = "While performing  GetClientPartitionAttributesOp "+  ((Throwable)obj).getMessage();
 +          throw new ServerOperationException(s, (Throwable) obj);
 +        default:
 +          throw new InternalGemFireError(
 +              LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
 +                  .toLocalizedString(Integer.valueOf(msg.getMessageType())));
 +      }
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGetClientPartitionAttributes();
 +    }
 +
 +    protected String getOpName() {
 +      return "GetClientPartitionAttributesOp";
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetClientPartitionAttributesSend(start, hasFailed());
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGetClientPartitionAttributes(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
index 7dfe9af,0000000..1038ede
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
@@@ -1,121 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.EventID;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +
 +/**
 + * Gets (full) value (unlike GetOp, which may get either a full value or a delta
 + * depending upon delta flag) of a given event from the ha container on server.
 + * 
 + * @since 6.1
 + */
 +public class GetEventValueOp {
 +  /**
 +   * Does a get on the primary server using connections from the given pool
 +   * @param pool the pool to use to communicate with the server.
 +   * @param event the eventid to do the get on
 +   * @param callbackArg an optional callback arg to pass to any cache callbacks
 +   * @return the entry value found by the get if any
 +   */
 +  public static Object executeOnPrimary(ExecutablePool pool, EventID event,
 +      Object callbackArg) {
 +    AbstractOp op = new GetEventValueOpImpl(event, callbackArg);
 +    return pool.executeOnPrimary(op);
 +  }
 +
 +
 +  private GetEventValueOp() {
 +    // no instances allowed
 +  }
 +
 +  static class GetEventValueOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public GetEventValueOpImpl(EventID event, Object callbackArg) {
 +      super(MessageType.REQUEST_EVENT_VALUE, callbackArg != null ? 2 : 1);
 +      getMessage().addObjPart(event);
 +      if (callbackArg != null) {
 +        getMessage().addObjPart(callbackArg);
 +      }
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      Part part = msg.getPart(0);
 +      final int msgType = msg.getMessageType();
 +      if (msgType == MessageType.RESPONSE) {
 +        return part;
 +      } else {
 +        if (msgType == MessageType.REQUEST_EVENT_VALUE_ERROR) {
 +          // Value not found in haContainer.
 +          return null;
 +        }
 +        else if (msgType == MessageType.EXCEPTION) {
 +          String s = "While performing a remote " + "getFullValue";
 +          throw new ServerOperationException(s, (Throwable) part.getObject());
 +          // Get the exception toString part.
 +          // This was added for c++ thin client and not used in java
 +          // Part exceptionToStringPart = msg.getPart(1);
 +        } else if (isErrorResponse(msgType)) {
 +          throw new ServerOperationException(part.getString());
 +        } else {
 +          throw new InternalGemFireError("Unexpected message type "
 +                                         + MessageType.getString(msgType));
 +        }
 +      }
 +    }
 +
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.REQUESTDATAERROR;
 +    }
 +
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGet();
 +    }
 +
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetSend(start, hasFailed());
 +    }
 +
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGet(start, hasTimedOut(), hasFailed());
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
index 9e63fba,0000000..177ea26
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
@@@ -1,84 -1,0 +1,84 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +
 +public class GetFunctionAttributeOp {
 +
 +  public static Object execute(ExecutablePool pool, String functionId) {
 +    AbstractOp op = new GetFunctionAttributeOpImpl(functionId);
 +    return pool.execute(op);
 +  }
 +
 +  private GetFunctionAttributeOp() {
 +    // no instances allowed
 +  }
 +
 +  static class GetFunctionAttributeOpImpl extends AbstractOp {
 +
 +    private String functionId = null;
 +
 +    public GetFunctionAttributeOpImpl(String functionId) {
 +      super(MessageType.GET_FUNCTION_ATTRIBUTES, 1);
 +      this.functionId = functionId;
 +      getMessage().addStringPart(this.functionId);
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      return processObjResponse(msg, "getFunctionAttribute");
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return msgType == MessageType.REQUESTDATAERROR;
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGet();
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetSend(start, hasFailed());
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGet(start, hasTimedOut(), hasFailed());
 +    }
 +
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
index d3e9efb,0000000..1e22d81
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
@@@ -1,94 -1,0 +1,92 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.pdx.internal.EnumInfo;
 +
 +/**
 + * Retrieve the PDXType, given an integer PDX id, from a server.
 + * @author darrel
 + * @since 6.6.2
 + */
 +public class GetPDXEnumByIdOp {
 +  /**
 +   * Get a enum from the given pool.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static EnumInfo execute(ExecutablePool pool,
 +                             int enumId)
 +  {
 +    AbstractOp op = new GetPDXEnumByIdOpImpl(enumId);
 +    return (EnumInfo) pool.execute(op);
 +  }
 +                                                               
 +  private GetPDXEnumByIdOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class GetPDXEnumByIdOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public GetPDXEnumByIdOpImpl(int enumId) {
 +      super(MessageType.GET_PDX_ENUM_BY_ID, 1);
 +      getMessage().addIntPart(enumId);
 +    }
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      return processObjResponse(msg, "getPDXEnumById");
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGetPDXTypeById(); // reuse PDXType stats instead of adding new enum ones
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeByIdSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    //Don't send the transaction id for this message type.
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
 +    
-     //TODO - no idea what this mumbo jumbo means, but it's on
-     //most of the other messages like this.
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
index 0cb5d33,0000000..0590caf
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
@@@ -1,112 -1,0 +1,112 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.util.Map;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.pdx.internal.EnumInfo;
 +
 +/**
 + * Retrieve all known PDX types.
 + * 
 + * @author bakera
 + * @since 7.0
 + */
 +public class GetPDXEnumsOp {
 +
 +  public static Map<Integer, EnumInfo> execute(ExecutablePool pool) {
 +    AbstractOp op = new GetPDXEnumsOpImpl();
 +    return (Map<Integer, EnumInfo>) pool.execute(op);
 +  }
 +                                                               
 +  private GetPDXEnumsOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class GetPDXEnumsOpImpl extends AbstractOp {
 +    public GetPDXEnumsOpImpl() {
 +      super(MessageType.GET_PDX_ENUMS, 1);
 +      getMessage().addIntPart(0); // must have at least one part
 +    }
 +
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      Part part = msg.getPart(0);
 +      int msgType = msg.getMessageType();
 +      if (msgType == MessageType.RESPONSE) {
 +        return (Map<Integer, EnumInfo>) part.getObject();
 +
 +      } else {
 +        if (msgType == MessageType.EXCEPTION) {
 +          String s = "While performing a remote " + "getPdxEnums";
 +          throw new ServerOperationException(s, (Throwable) part.getObject());
 +
 +        } else if (isErrorResponse(msgType)) {
 +          throw new ServerOperationException(part.getString());
 +
 +        } else {
 +          throw new InternalGemFireError("Unexpected message type "
 +              + MessageType.getString(msgType));
 +        }
 +      }
 +    }
 +
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return 0;
 +    }
 +
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +    }
 +
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +    }
 +    
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
 +
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
index 81a2b1b,0000000..bac2e80
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
@@@ -1,115 -1,0 +1,113 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.pdx.internal.EnumInfo;
 +
 +/**
 + * Retrieve the PDXType, given an integer PDX id, from a server.
 + * @author darrel
 + * @since 6.6.2
 + */
 +public class GetPDXIdForEnumOp {
 +  /**
 +   * Register a bunch of instantiators on a server
 +   * using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static int execute(ExecutablePool pool,
 +                             EnumInfo ei)
 +  {
 +    AbstractOp op = new GetPDXIdForEnumOpImpl(ei);
 +    return ((Integer) pool.execute(op)).intValue();
 +  }
 +                                                               
 +  private GetPDXIdForEnumOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class GetPDXIdForEnumOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public GetPDXIdForEnumOpImpl(EnumInfo ei) {
 +      super(MessageType.GET_PDX_ID_FOR_ENUM, 1);
 +      getMessage().addObjPart(ei);
 +    }
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      Part part = msg.getPart(0);
 +      final int msgType = msg.getMessageType();
 +      if (msgType == MessageType.RESPONSE) {
 +        return Integer.valueOf(part.getInt());
 +      } else {
 +        if (msgType == MessageType.EXCEPTION) {
 +          String s = "While performing a remote " + "getPdxIdForEnum";
 +          throw new ServerOperationException(s, (Throwable) part.getObject());
 +          // Get the exception toString part.
 +          // This was added for c++ thin client and not used in java
 +          // Part exceptionToStringPart = msg.getPart(1);
 +        } else if (isErrorResponse(msgType)) {
 +          throw new ServerOperationException(part.getString());
 +        } else {
 +          throw new InternalGemFireError("Unexpected message type "
 +                                         + MessageType.getString(msgType));
 +        }
 +      }
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGetPDXTypeById();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeByIdSend(start, hasFailed()); /* reusing type stats instead of adding enum ones */
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    //Don't send the transaction id for this message type.
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
-     //TODO - no idea what this mumbo jumbo means, but it's on
-     //most of the other messages like this.
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
index d771cb6,0000000..1b71f71
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
@@@ -1,115 -1,0 +1,113 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.cache.client.ServerOperationException;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
 +import com.gemstone.gemfire.pdx.internal.PdxType;
 +
 +/**
 + * Retrieve the PDXType, given an integer PDX id, from a server.
 + * @author dsmith
 + * @since 6.6
 + */
 +public class GetPDXIdForTypeOp {
 +  /**
 +   * Register a bunch of instantiators on a server
 +   * using connections from the given pool
 +   * to communicate with the server.
 +   * @param pool the pool to use to communicate with the server.
 +   */
 +  public static int execute(ExecutablePool pool,
 +                             PdxType type)
 +  {
 +    AbstractOp op = new GetPDXIdForTypeOpImpl(type);
 +    return ((Integer) pool.execute(op)).intValue();
 +  }
 +                                                               
 +  private GetPDXIdForTypeOp() {
 +    // no instances allowed
 +  }
 +  
 +  private static class GetPDXIdForTypeOpImpl extends AbstractOp {
 +    /**
 +     * @throws com.gemstone.gemfire.SerializationException if serialization fails
 +     */
 +    public GetPDXIdForTypeOpImpl(PdxType type) {
 +      super(MessageType.GET_PDX_ID_FOR_TYPE, 1);
 +      getMessage().addObjPart(type);
 +    }
 +    @Override
 +    protected Object processResponse(Message msg) throws Exception {
 +      Part part = msg.getPart(0);
 +      final int msgType = msg.getMessageType();
 +      if (msgType == MessageType.RESPONSE) {
 +        return Integer.valueOf(part.getInt());
 +      } else {
 +        if (msgType == MessageType.EXCEPTION) {
 +          String s = "While performing a remote " + "getPdxIdForType";
 +          throw new ServerOperationException(s, (Throwable) part.getObject());
 +          // Get the exception toString part.
 +          // This was added for c++ thin client and not used in java
 +          // Part exceptionToStringPart = msg.getPart(1);
 +        } else if (isErrorResponse(msgType)) {
 +          throw new ServerOperationException(part.getString());
 +        } else {
 +          throw new InternalGemFireError("Unexpected message type "
 +                                         + MessageType.getString(msgType));
 +        }
 +      }
 +    }
 +    @Override
 +    protected boolean isErrorResponse(int msgType) {
 +      return false;
 +    }
 +    @Override
 +    protected long startAttempt(ConnectionStats stats) {
 +      return stats.startGetPDXTypeById();
 +    }
 +    @Override
 +    protected void endSendAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeByIdSend(start, hasFailed());
 +    }
 +    @Override
 +    protected void endAttempt(ConnectionStats stats, long start) {
 +      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
 +    }
 +    @Override
 +    protected void processSecureBytes(Connection cnx, Message message)
 +        throws Exception {
 +    }
 +    @Override
 +    protected boolean needsUserId() {
 +      return false;
 +    }
 +    //Don't send the transaction id for this message type.
 +    @Override
 +    protected boolean participateInTransaction() {
 +      return false;
 +    }
-     //TODO - no idea what this mumbo jumbo means, but it's on
-     //most of the other messages like this.
 +    @Override
 +    protected void sendMessage(Connection cnx) throws Exception {
-       getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++      getMessage().clearMessageHasSecurePartFlag();
 +      getMessage().send(false);
 +    }
 +  }
 +}