You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:38 UTC
[050/100] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
index 0ae0437,0000000..28c1e0f
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
@@@ -1,436 -1,0 +1,436 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.net.SocketTimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+
+/**
+ * Represents an operation that can be performed in a client by sending
+ * a message to a server.
+ * @since 5.7
+ */
+public abstract class AbstractOp implements Op {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private final Message msg;
+
+ protected AbstractOp(int msgType, int msgParts) {
+ this.msg = new Message(msgParts, Version.CURRENT);
+ getMessage().setMessageType(msgType);
+ }
+
+ /**
+ * Returns the message that this op will send to the server
+ */
+ protected Message getMessage() {
+ return this.msg;
+ }
+ protected void initMessagePart() {
+
+ }
+ /**
+ * Sets the transaction id on the message
+ */
+ private void setMsgTransactionId() {
+ if (participateInTransaction()
+ && getMessage().getTransactionId() == TXManagerImpl.NOTX) {
+ getMessage().setTransactionId(TXManagerImpl.getCurrentTXUniqueId());
+ }
+ }
+
+ /**
+ * Attempts to send this operation's message out on the
+ * given connection
+ * @param cnx the connection to use when sending
+ * @throws Exception if the send fails
+ */
+ protected void attemptSend(Connection cnx) throws Exception {
+ setMsgTransactionId();
+ if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending op={} using {}", getShortClassName(), cnx);
+ }
+ }
+ getMessage().setComms(cnx.getSocket(), cnx.getInputStream(),
+ cnx.getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
+ try {
+ sendMessage(cnx);
+ } finally {
+ getMessage().unsetComms();
+ }
+ }
+
+ /** returns the class name w/o package information. useful in logging */
+ public String getShortClassName() {
+ String cname = getClass().getName();
+ return cname.substring(getClass().getPackage().getName().length()+1);
+ }
+
+ /**
+ * New implementations of AbstractOp should override this method if the
+ * implementation should be excluded from client authentication. e.g.
+ * PingOp#sendMessage(Connection cnx)
+ *
+ * @see AbstractOp#needsUserId()
+ * @see AbstractOp#processSecureBytes(Connection, Message)
+ * @see ServerConnection#updateAndGetSecurityPart()
+ */
+ protected void sendMessage(Connection cnx) throws Exception {
+ if (cnx.getServer().getRequiresCredentials()) {
+ // Security is enabled on client as well as on server
- getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++ getMessage().setMessageHasSecurePartFlag();
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ long userId = -1;
+
+ if (UserAttributes.userAttributes.get() == null) { // single user mode
+ userId = cnx.getServer().getUserId();
+ } else { // multi user mode
+ Object id = UserAttributes.userAttributes.get().getServerToId().get(
+ cnx.getServer());
+ if (id == null) {
+ // This will ensure that this op is retried on another server, unless
+ // the retryCount is exhausted. Fix for Bug 41501
+ throw new ServerConnectivityException(
+ "Connection error while authenticating user"); // TODO:LOG hdos is not closed??
+ }
+ userId = (Long)id;
+ }
+ try {
+ hdos.writeLong(cnx.getConnectionID());
+ hdos.writeLong(userId);
+ getMessage().setSecurePart(
+ ((ConnectionImpl)cnx).getHandShake().encryptBytes(
+ hdos.toByteArray()));
+ } finally {
+ hdos.close();
+ }
+ }
+ getMessage().send(false);
+ }
+
+ /**
+ * Attempts to read a response to this operation by reading it from the
+ * given connection, and returning it.
+ * @param cnx the connection to read the response from
+ * @return the result of the operation
+ * or <code>null</code> if the operation has no result.
+ * @throws Exception if the execute failed
+ */
+ protected Object attemptReadResponse(Connection cnx) throws Exception {
+ Message msg = createResponseMessage();
+ if (msg != null) {
+ msg.setComms(cnx.getSocket(), cnx.getInputStream(),
+ cnx.getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
+ if (msg instanceof ChunkedMessage) {
+ try {
+ return processResponse(msg, cnx);
+ } finally {
+ msg.unsetComms();
+ // TODO (ashetkar) Handle the case when we fail to read the connection id.
+ processSecureBytes(cnx, msg);
+ }
+ } else {
+ try {
+ msg.recv();
+ } finally {
+ msg.unsetComms();
+ processSecureBytes(cnx, msg);
+ }
+ return processResponse(msg, cnx);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * New implementations of AbstractOp should override this method if the
+ * implementation should be excluded from client authentication. e.g.
+ * PingOp#processSecureBytes(Connection cnx, Message message)
+ *
+ * @see AbstractOp#sendMessage(Connection)
+ * @see AbstractOp#needsUserId()
+ * @see ServerConnection#updateAndGetSecurityPart()
+ */
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ if (cnx.getServer().getRequiresCredentials()) {
+ if (!message.isSecureMode()) {
+ // This can be seen during shutdown
+ if (logger.isDebugEnabled()) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "Response message from {} for {} has no secure part.", cnx, this);
+ }
+ return;
+ }
+ byte[] partBytes = message.getSecureBytes();
+ if (partBytes == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Response message for {} has no bytes in secure part.", this);
+ }
+ return;
+ }
+ byte[] bytes = ((ConnectionImpl)cnx).getHandShake().decryptBytes(
+ partBytes);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ cnx.setConnectionID(dis.readLong());
+ }
+ }
+
+ /**
+ * By default just create a normal one part msg.
+ * Subclasses can override this.
+ */
+ protected Message createResponseMessage() {
+ return new Message(1, Version.CURRENT);
+ }
+
+ protected Object processResponse(Message m, Connection con) throws Exception {
+ return processResponse(m);
+ }
+
+ /**
+ * Processes the given response message returning the result, if any,
+ * of the processing.
+ * @return the result of processing the response; null if no result
+ * @throws Exception if response could not be processed or
+ * we received a response with a server exception.
+ */
+ protected abstract Object processResponse(Message msg) throws Exception;
+
+ /**
+ * Return true of <code>msgType</code> indicates the operation
+ * had an error on the server.
+ */
+ protected abstract boolean isErrorResponse(int msgType);
+ /**
+ * Process a response that contains an ack.
+ * @param msg the message containing the response
+ * @param opName text describing this op
+ * @throws Exception if response could not be processed or
+ * we received a response with a server exception.
+ */
+ protected void processAck(Message msg, String opName)
+ throws Exception
+ {
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY) {
+ return;
+ } else {
+ Part part = msg.getPart(0);
+ if (msgType == MessageType.EXCEPTION) {
+ String s = ": While performing a remote " + opName;
+ Throwable t = (Throwable) part.getObject();
+ if (t instanceof PutAllPartialResultException) {
+ throw (PutAllPartialResultException)t;
+ } else {
+ throw new ServerOperationException(s, t);
+ }
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+ /**
+ * Process a response that contains a single Object result.
+ * @param msg the message containing the response
+ * @param opName text describing this op
+ * @return the result of the response
+ * @throws Exception if response could not be processed or
+ * we received a response with a server exception.
+ */
+ protected final Object processObjResponse(Message msg, String opName)
+ throws Exception
+ {
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ return part.getObject();
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote " + opName;
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+ /**
+ * Used by subclasses who get chunked responses.
+ */
+ public interface ChunkHandler {
+ /**
+ * This method will be called once for every incoming chunk
+ * @param msg the current chunk to handle
+ */
+ public void handle(ChunkedMessage msg) throws Exception;
+ }
+ /**
+ * Process a chunked response that contains a single Object result.
+ * @param msg the message containing the response
+ * @param opName text describing this op
+ * @param callback used to handle each chunks data
+ * @throws Exception if response could not be processed or
+ * we received a response with a server exception.
+ */
+ protected final void processChunkedResponse(ChunkedMessage msg, String opName, ChunkHandler callback)
+ throws Exception
+ {
+ msg.readHeader();
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ do {
+ msg.receiveChunk();
+ callback.handle(msg);
+ } while (!msg.isLastChunk());
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ msg.receiveChunk();
+ Part part = msg.getPart(0);
+ String s = "While performing a remote " + opName;
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ msg.receiveChunk();
+ Part part = msg.getPart(0);
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+
+ /**
+ * Set to true if this attempt failed
+ */
+ protected boolean failed;
+ /**
+ * Set to true if this attempt timed out
+ */
+ protected boolean timedOut;
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.Op#attempt(com.gemstone.gemfire.cache.client.internal.Connection)
+ */
+ public Object attempt(Connection cnx) throws Exception {
+ this.failed = true;
+ this.timedOut = false;
+ long start = startAttempt(cnx.getStats());
+ try {
+ try {
+ attemptSend(cnx);
+ this.failed = false;
+ } finally {
+ endSendAttempt(cnx.getStats(), start);
+ }
+ this.failed = true;
+ try {
+ Object result = attemptReadResponse(cnx);
+ this.failed = false;
+ return result;
+ } catch (SocketTimeoutException ste) {
+ this.failed = false;
+ this.timedOut = true;
+ throw ste;
+ } catch(Exception e) {
+ throw e;
+ }
+ } finally {
+ endAttempt(cnx.getStats(), start);
+ }
+ }
+ protected final boolean hasFailed() {
+ return this.failed;
+ }
+ protected final boolean hasTimedOut() {
+ return this.timedOut;
+ }
+ protected abstract long startAttempt(ConnectionStats stats);
+ protected abstract void endSendAttempt(ConnectionStats stats, long start);
+ protected abstract void endAttempt(ConnectionStats stats, long start);
+
+ /**
+ * New implementations of AbstractOp should override this method to return
+ * false if the implementation should be excluded from client authentication.
+ * e.g. PingOp#needsUserId()
+ * <P/>
+ * Also, such an operation's <code>MessageType</code> must be added in the
+ * 'if' condition in {@link ServerConnection#updateAndGetSecurityPart()}
+ *
+ * @return boolean
+ * @see AbstractOp#sendMessage(Connection)
+ * @see AbstractOp#processSecureBytes(Connection, Message)
+ * @see ServerConnection#updateAndGetSecurityPart()
+ */
+ protected boolean needsUserId() {
+ return true;
+ }
+
+ /**
+ * Subclasses for AbstractOp should override this method to return
+ * false in this message should not participate in any existing transaction
+ * @return true if the message should participate in transaction
+ */
+ protected boolean participateInTransaction() {
+ return true;
+ }
+
+ @Override
+ public boolean useThreadLocalConnection() {
+ return true;
+ }
+
+ public boolean isGatewaySenderOp() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
index b88948d,0000000..ff2bf1c
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXEnumOp.java
@@@ -1,98 -1,0 +1,98 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Push a PDX Enum id to other servers.
+ * @author darrel
+ * @since 6.6.2
+ */
+public class AddPDXEnumOp {
+ /**
+ * Register a bunch of instantiators on a server
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static void execute(ExecutablePool pool, int id,
+ EnumInfo ei)
+ {
+ AbstractOp op = new AddPdxEnumOpImpl(id, ei);
+ pool.execute(op);;
+ }
+
+ private AddPDXEnumOp() {
+ // no instances allowed
+ }
+
+ private static class AddPdxEnumOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public AddPdxEnumOpImpl(int id, EnumInfo ei) {
+ super(MessageType.ADD_PDX_ENUM, 2);
+ getMessage().addObjPart(ei);
+ getMessage().addIntPart(id);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "addPDXEnum");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startAddPdxType(); /* use the addPdxType stats instead of adding more stats */
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endAddPdxTypeSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endAddPdxType(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ //Don't send the transaction id for this message type.
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
+
+ //TODO - no idea what this mumbo jumbo means, but it's on
+ //most of the other messages like this.
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
index 92dd246,0000000..9fbc674
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AddPDXTypeOp.java
@@@ -1,98 -1,0 +1,96 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Add a PdxType to a server.
+ * @author dsmith
+ * @since 6.6
+ */
+public class AddPDXTypeOp {
+ /**
+ * Register a bunch of instantiators on a server
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static void execute(ExecutablePool pool, int id,
+ PdxType type)
+ {
+ AbstractOp op = new AddPDXTypeOpImpl(id, type);
+ pool.execute(op);
+ }
+
+ private AddPDXTypeOp() {
+ // no instances allowed
+ }
+
+ private static class AddPDXTypeOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public AddPDXTypeOpImpl(int id, PdxType type) {
+ super(MessageType.ADD_PDX_TYPE, 2);
+ getMessage().addObjPart(type);
+ getMessage().addIntPart(id);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "addPDXType");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startAddPdxType();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endAddPdxTypeSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endAddPdxType(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ //Don't send the transaction id for this message type.
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
+
- //TODO - no idea what this mumbo jumbo means, but it's on
- //most of the other messages like this.
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
index b0f0cec,0000000..b03c7b9
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AuthenticateUserOp.java
@@@ -1,312 -1,0 +1,312 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.Properties;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.command.PutUserCredentials;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import com.gemstone.gemfire.security.NotAuthorizedException;
+
+/**
+ * Authenticates this client (or a user) on a server. This op ideally should get
+ * executed once-per-server.
+ *
+ * When multiuser-authentication is set to false, this op gets executed
+ * immedialtely after a client-to-server connection is established.
+ *
+ * When multiuser-authentication is set to true, this op gets executed
+ * before the user attempts to perform an op whose
+ * {@link AbstractOp#needsUserId()} returns true.
+ *
+ * @author ashetkar
+ * @see PutUserCredentials
+ * @see ProxyCache
+ * @since 6.5
+ */
+public class AuthenticateUserOp {
+
+ /**
+ * Sends the auth credentials to the server. Used in single user mode of
+ * authentication.
+ *
+ * @param con
+ * The connection to use for this operation.
+ * @param pool
+ * The connection pool to use for this operation.
+ * @return Object unique user-id.
+ */
+ public static Object executeOn(Connection con, ExecutablePool pool) {
+ AbstractOp op = new AuthenticateUserOpImpl(con, pool);
+ return pool.executeOn(con, op);
+ }
+
+ /**
+ * Sends the auth credentials to the server for a particular user. Used in
+ * multiple user mode of authentication.
+ *
+ * @param location
+ * The ServerLocation instance whose connection instance will be used
+ * to perform the operation.
+ * @param pool
+ * The connection pool to use for this operation.
+ * @param securityProps
+ * @return Object unique user-id.
+ */
+ public static Object executeOn(ServerLocation location, ExecutablePool pool,
+ Properties securityProps) {
+ AbstractOp op = new AuthenticateUserOpImpl(pool, securityProps);
+ return pool.executeOn(location, op);
+ }
+
+ private AuthenticateUserOp() {
+ // no instances allowed
+ }
+
+ static class AuthenticateUserOpImpl extends AbstractOp {
+
+ private Properties securityProperties = null;
+ private boolean needsServerLocation = false;
+
+ public AuthenticateUserOpImpl(Connection con, ExecutablePool pool) {
+ super(MessageType.USER_CREDENTIAL_MESSAGE, 1);
+ byte[] credentialBytes = null;
+ // TODO this is not a valid way to create a member ID
+ DistributedMember server = new InternalDistributedMember(con.getSocket()
+ .getInetAddress(), con.getSocket().getPort(), false);
+ DistributedSystem sys = InternalDistributedSystem.getConnectedInstance();
+ String authInitMethod = sys.getProperties().getProperty(
+ DistributionConfig.SECURITY_CLIENT_AUTH_INIT_NAME);
+ Properties tmpSecurityProperties = sys.getSecurityProperties();
+
+ // LOG: following passes the DS API LogWriters into the security API
+ Properties credentials = HandShake.getCredentials(authInitMethod,
+ tmpSecurityProperties, server, false, (InternalLogWriter)sys.getLogWriter(), (InternalLogWriter)sys
+ .getSecurityLogWriter());
+
- getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++ getMessage().setMessageHasSecurePartFlag();
+ HeapDataOutputStream heapdos = new HeapDataOutputStream(Version.CURRENT);
+ try {
+ DataSerializer.writeProperties(credentials, heapdos);
+ credentialBytes = ((ConnectionImpl)con).getHandShake()
+ .encryptBytes(heapdos.toByteArray());
+ } catch (Exception e) {
+ throw new ServerOperationException(e);
+ } finally {
+ heapdos.close();
+ }
+ getMessage().addBytesPart(credentialBytes);
+ }
+
+ public AuthenticateUserOpImpl(ExecutablePool pool, Properties securityProps) {
+ this(pool, securityProps, false);
+ }
+
+ public AuthenticateUserOpImpl(ExecutablePool pool, Properties securityProps, boolean needsServer) {
+ super(MessageType.USER_CREDENTIAL_MESSAGE, 1);
+ this.securityProperties = securityProps;
+ this.needsServerLocation = needsServer;
+
- getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++ getMessage().setMessageHasSecurePartFlag();
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ byte[] secureBytes = null;
+ hdos.writeLong(cnx.getConnectionID());
+ if (this.securityProperties != null) {
+ byte[] credentialBytes = null;
+ // TODO this is not a valid way to create a member ID
+ DistributedMember server = new InternalDistributedMember(cnx
+ .getSocket().getInetAddress(), cnx.getSocket().getPort(), false);
+ DistributedSystem sys = InternalDistributedSystem
+ .getConnectedInstance();
+ String authInitMethod = sys.getProperties().getProperty(
+ DistributionConfig.SECURITY_CLIENT_AUTH_INIT_NAME);
+
+ Properties credentials = HandShake.getCredentials(authInitMethod,
+ this.securityProperties, server, false, (InternalLogWriter)sys.getLogWriter(), (InternalLogWriter)sys
+ .getSecurityLogWriter());
+ HeapDataOutputStream heapdos = new HeapDataOutputStream(Version.CURRENT);
+ try {
+ DataSerializer.writeProperties(credentials, heapdos);
+ credentialBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
+ heapdos.toByteArray());
+ } finally {
+ heapdos.close();
+ }
+ getMessage().addBytesPart(credentialBytes);
+ }
+ try {
+ secureBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
+ hdos.toByteArray());
+ } finally {
+ hdos.close();
+ }
+ getMessage().setSecurePart(secureBytes);
+ getMessage().send(false);
+ }
+
+ @Override
+ public Object attempt(Connection cnx) throws Exception {
+ if (cnx.getServer().getRequiresCredentials()) {
+ return super.attempt(cnx);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected Object attemptReadResponse(Connection cnx) throws Exception {
+ Message msg = createResponseMessage();
+ if (msg != null) {
+ msg.setComms(cnx.getSocket(), cnx.getInputStream(),
+ cnx.getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
+ if (msg instanceof ChunkedMessage) {
+ try {
+ return processResponse(cnx, msg);
+ } finally {
+ msg.unsetComms();
+ processSecureBytes(cnx, msg);
+ }
+ } else {
+ try {
+ msg.recv();
+ } finally {
+ msg.unsetComms();
+ processSecureBytes(cnx, msg);
+ }
+ return processResponse(cnx, msg);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ protected Object processResponse(Connection cnx, Message msg) throws Exception {
+ byte[] bytes = null;
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ long userId = -1;
+ if (msgType == MessageType.RESPONSE) {
+ bytes = (byte[])part.getObject();
+ if (bytes.length == 0) {
+ cnx.getServer().setRequiresCredentials(false);
+ } else {
+ cnx.getServer().setRequiresCredentials(true);
+ byte[] decrypted = ((ConnectionImpl)cnx).getHandShake().decryptBytes(bytes);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(decrypted));
+ userId = dis.readLong();
+ }
+ if (this.needsServerLocation) {
+ return new Object[] {cnx.getServer(), userId};
+ } else {
+ return userId;
+ }
+ }
+ else if (msgType == MessageType.EXCEPTION) {
+ Object result = part.getObject();
+ String s = "While performing a remote authenticate";
+ if (result instanceof AuthenticationFailedException) {
+ final AuthenticationFailedException afe =
+ (AuthenticationFailedException)result;
+ if ("REPLY_REFUSED".equals(afe.getMessage())) {
+ throw new AuthenticationFailedException(s, afe.getCause());
+ }
+ else {
+ throw new AuthenticationFailedException(s, afe);
+ }
+ }
+ else if (result instanceof AuthenticationRequiredException) {
+ throw new AuthenticationRequiredException(s,
+ (AuthenticationRequiredException)result);
+ }
+ else if (result instanceof NotAuthorizedException) {
+ throw new NotAuthorizedException(s, (NotAuthorizedException)result);
+ }
+ else {
+ throw new ServerOperationException(s, (Throwable)result);
+ }
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ }
+ else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ }
+ else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REQUESTDATAERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGet();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGet(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ return null;
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
index 943f0a3,0000000..7de38fe
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
@@@ -1,95 -1,0 +1,95 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tell a server that a connection is being closed
+ * @author darrel
+ * @since 5.7
+ */
+public class CloseConnectionOp {
+ /**
+ * Tell a server that a connection is being closed
+ * @param con the connection that is being closed
+ * @param keepAlive whether to keep the proxy alive on the server
+ */
+ public static void execute(Connection con, boolean keepAlive)
+ throws Exception
+ {
+ AbstractOp op = new CloseConnectionOpImpl(keepAlive);
+ con.execute(op);
+ }
+
+ private CloseConnectionOp() {
+ // no instances allowed
+ }
+
+ private static class CloseConnectionOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public CloseConnectionOpImpl(boolean keepAlive) {
+ super(MessageType.CLOSE_CONNECTION, 1);
+ getMessage().addRawPart(new byte[]{(byte)(keepAlive?1:0)}, false);
+ }
+ @Override
+ protected Message createResponseMessage() {
+ // no response is sent
+ return null;
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ throw new IllegalStateException("should never be called");
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startCloseCon();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endCloseConSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endCloseCon(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
index e04a466,0000000..c9c6dd7
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
@@@ -1,109 -1,0 +1,109 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a commit on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class CommitOp {
+ /**
+ * Does a commit on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static TXCommitMessage execute(ExecutablePool pool,int txId)
+ {
+ CommitOpImpl op = new CommitOpImpl(txId);
+ pool.execute(op);
+ return op.getTXCommitMessageResponse();
+ }
+
+ private CommitOp() {
+ // no instances allowed
+ }
+
+
+ private static class CommitOpImpl extends AbstractOp {
+ private int txId;
+
+ private TXCommitMessage tXCommitMessageResponse = null;
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public CommitOpImpl(int txId) {
+ super(MessageType.COMMIT, 1);
+ getMessage().setTransactionId(txId);
+ this.txId = txId;
+ }
+
+ public TXCommitMessage getTXCommitMessageResponse() {
+ return tXCommitMessageResponse;
+ }
+
+ @Override
+ public String toString() {
+ return "TXCommit(txId="+this.txId+")";
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, "commit");
+ assert rcs != null : "TxCommit response was null";
+ this.tXCommitMessageResponse = rcs;
+ return rcs;
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXCEPTION;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startCommit();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endCommitSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endCommit(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
index f88d1e9,0000000..8bae6ff
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
@@@ -1,171 -1,0 +1,171 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Retrieves {@link ClientPartitionAdvisor} for the specified PartitionedRegion from
+ * one of the servers
+ *
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ *
+ * @since 6.5
+ */
+public class GetClientPRMetaDataOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private GetClientPRMetaDataOp() {
+ // no instances allowed
+ }
+
+ public static void execute(ExecutablePool pool, String regionFullPath,
+ ClientMetadataService cms) {
+ AbstractOp op = new GetClientPRMetaDataOpImpl(regionFullPath, cms);
+ if (logger.isDebugEnabled()) {
+ logger.debug("GetClientPRMetaDataOp#execute : Sending GetClientPRMetaDataOp Message: {} to server using pool: {}", op.getMessage(), pool);
+ }
+ pool.execute(op);
+ }
+
+ static class GetClientPRMetaDataOpImpl extends AbstractOp {
+
+ String regionFullPath = null;
+
+ ClientMetadataService cms = null;
+
+ public GetClientPRMetaDataOpImpl(String regionFullPath, ClientMetadataService cms) {
+ super(MessageType.GET_CLIENT_PR_METADATA, 1);
+ this.regionFullPath = regionFullPath;
+ this.cms = cms;
+ getMessage().addStringPart(regionFullPath);
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ switch (msg.getMessageType()) {
+ case MessageType.GET_CLIENT_PR_METADATA_ERROR:
+ String errorMsg = msg.getPart(0).getString();
+ if (logger.isDebugEnabled()) {
+ logger.debug(errorMsg);
+ }
+ throw new ServerOperationException(errorMsg);
+ case MessageType.RESPONSE_CLIENT_PR_METADATA:
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("GetClientPRMetaDataOpImpl#processResponse: received message of type : {}" + MessageType.getString(msg.getMessageType()));
+ }
+ int numParts = msg.getNumberOfParts();
+ ClientPartitionAdvisor advisor = cms
+ .getClientPartitionAdvisor(regionFullPath);
+ for (int i = 0; i < numParts; i++) {
+ Object result = msg.getPart(i).getObject();
+ List<BucketServerLocation66> locations = (List<BucketServerLocation66>)result;
+ if (!locations.isEmpty()) {
+ int bucketId = locations.get(0).getBucketId();
+ if (isDebugEnabled) {
+ logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations);
+ }
+ advisor.updateBucketServerLocations(bucketId, locations, cms);
+
+ Set<ClientPartitionAdvisor> cpas = cms
+ .getColocatedClientPartitionAdvisor(regionFullPath);
+ if (cpas != null && !cpas.isEmpty()) {
+ for (ClientPartitionAdvisor colCPA : cpas) {
+ colCPA.updateBucketServerLocations(bucketId, locations, cms);
+ }
+ }
+ }
+ }
+ if (isDebugEnabled) {
+ logger.debug("GetClientPRMetaDataOpImpl#processResponse: received ClientPRMetadata from server successfully.");
+ }
+ cms.setMetadataStable(true);
+ return null;
+ case MessageType.EXCEPTION:
+ if (logger.isDebugEnabled()) {
+ logger.debug("GetClientPRMetaDataOpImpl#processResponse: received message of type EXCEPTION");
+ }
+ Part part = msg.getPart(0);
+ Object obj = part.getObject();
+ String s = "While performing GetClientPRMetaDataOp "
+ + ((Throwable)obj).getMessage();
+ throw new ServerOperationException(s, (Throwable)obj);
+ default:
+ throw new InternalGemFireError(
+ LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+ .toLocalizedString(Integer.valueOf(msg.getMessageType())));
+ }
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGetClientPRMetadata();
+ }
+
+ protected String getOpName() {
+ return "GetClientPRMetaDataOp";
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetClientPRMetadataSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGetClientPRMetadata(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
index 004872c,0000000..e1a8870
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
@@@ -1,177 -1,0 +1,177 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.FixedPartitionAttributes;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ *
+ * Retrieves {@link ClientPartitionAdvisor} related information for the
+ * specified PartitionedRegion from one of the servers
+ *
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ *
+ * @since 6.5
+ *
+ */
+public class GetClientPartitionAttributesOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private GetClientPartitionAttributesOp() {
+ // no instances allowed
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ClientPartitionAdvisor execute(ExecutablePool pool, String regionFullPath) {
+ AbstractOp op = new GetClientPartitionAttributesOpImpl(regionFullPath);
+ if (logger.isDebugEnabled()) {
+ logger.debug("GetClientPartitionAttributesOp#execute : Sending GetClientPartitionAttributesOp Message: {} for region: {} to server using pool: {}", op.getMessage(), regionFullPath, pool);
+ }
+
+ ClientPartitionAdvisor advisor = (ClientPartitionAdvisor)pool.execute(op);
+
+ if (advisor != null) {
+ advisor.setServerGroup(((PoolImpl)pool).getServerGroup());
+ }
+
+ return advisor;
+ }
+
+ static class GetClientPartitionAttributesOpImpl extends AbstractOp {
+
+ String regionFullPath = null;
+
+ public GetClientPartitionAttributesOpImpl(String regionFullPath) {
+ super(MessageType.GET_CLIENT_PARTITION_ATTRIBUTES, 1);
+ this.regionFullPath = regionFullPath;
+ getMessage().addStringPart(regionFullPath);
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ switch (msg.getMessageType()) {
+ case MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR:
+ String errorMsg = msg.getPart(0).getString();
+ if (logger.isDebugEnabled()) {
+ logger.debug(errorMsg);
+ }
+ throw new ServerOperationException(errorMsg);
+ case MessageType.RESPONSE_CLIENT_PARTITION_ATTRIBUTES:
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received message of type : {}", MessageType.getString(msg.getMessageType()));
+ }
+ int bucketCount;
+ String colocatedWith;
+ String partitionResolverName = null;
+ Set<FixedPartitionAttributes> fpaSet = null;
+ bucketCount = (Integer)msg.getPart(0).getObject();
+ colocatedWith = (String)msg.getPart(1).getObject();
+ if (msg.getNumberOfParts() == 4) {
+ partitionResolverName = (String)msg.getPart(2).getObject();
+ fpaSet = (Set<FixedPartitionAttributes>)msg.getPart(3).getObject();
+ }
+ else if (msg.getNumberOfParts() == 3) {
+ Object obj = msg.getPart(2).getObject();
+ if(obj instanceof String){
+ partitionResolverName = (String)obj;
+ }else{
+ fpaSet = (Set<FixedPartitionAttributes>)obj;
+ }
+ }
+ else if(bucketCount==-1){
+ return null;
+ }
+ if (isDebugEnabled) {
+ logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received all the results from server successfully.");
+ }
+ ClientPartitionAdvisor advisor = new ClientPartitionAdvisor(bucketCount, colocatedWith,
+ partitionResolverName, fpaSet);
+ return advisor;
+
+ case MessageType.EXCEPTION:
+ if (logger.isDebugEnabled()) {
+ logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received message of type EXCEPTION");
+ }
+ Part part = msg.getPart(0);
+ Object obj = part.getObject();
+ String s = "While performing GetClientPartitionAttributesOp "+ ((Throwable)obj).getMessage();
+ throw new ServerOperationException(s, (Throwable) obj);
+ default:
+ throw new InternalGemFireError(
+ LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+ .toLocalizedString(Integer.valueOf(msg.getMessageType())));
+ }
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGetClientPartitionAttributes();
+ }
+
+ protected String getOpName() {
+ return "GetClientPartitionAttributesOp";
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetClientPartitionAttributesSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGetClientPartitionAttributes(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
index 7dfe9af,0000000..1038ede
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
@@@ -1,121 -1,0 +1,121 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+
+/**
+ * Gets (full) value (unlike GetOp, which may get either a full value or a delta
+ * depending upon delta flag) of a given event from the ha container on server.
+ *
+ * @since 6.1
+ */
+public class GetEventValueOp {
+ /**
+ * Does a get on the primary server using connections from the given pool
+ * @param pool the pool to use to communicate with the server.
+ * @param event the eventid to do the get on
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ * @return the entry value found by the get if any
+ */
+ public static Object executeOnPrimary(ExecutablePool pool, EventID event,
+ Object callbackArg) {
+ AbstractOp op = new GetEventValueOpImpl(event, callbackArg);
+ return pool.executeOnPrimary(op);
+ }
+
+
+ private GetEventValueOp() {
+ // no instances allowed
+ }
+
+ static class GetEventValueOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public GetEventValueOpImpl(EventID event, Object callbackArg) {
+ super(MessageType.REQUEST_EVENT_VALUE, callbackArg != null ? 2 : 1);
+ getMessage().addObjPart(event);
+ if (callbackArg != null) {
+ getMessage().addObjPart(callbackArg);
+ }
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ return part;
+ } else {
+ if (msgType == MessageType.REQUEST_EVENT_VALUE_ERROR) {
+ // Value not found in haContainer.
+ return null;
+ }
+ else if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote " + "getFullValue";
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REQUESTDATAERROR;
+ }
+
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGet();
+ }
+
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetSend(start, hasFailed());
+ }
+
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGet(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
index 9e63fba,0000000..177ea26
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
@@@ -1,84 -1,0 +1,84 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+public class GetFunctionAttributeOp {
+
+ public static Object execute(ExecutablePool pool, String functionId) {
+ AbstractOp op = new GetFunctionAttributeOpImpl(functionId);
+ return pool.execute(op);
+ }
+
+ private GetFunctionAttributeOp() {
+ // no instances allowed
+ }
+
+ static class GetFunctionAttributeOpImpl extends AbstractOp {
+
+ private String functionId = null;
+
+ public GetFunctionAttributeOpImpl(String functionId) {
+ super(MessageType.GET_FUNCTION_ATTRIBUTES, 1);
+ this.functionId = functionId;
+ getMessage().addStringPart(this.functionId);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ return processObjResponse(msg, "getFunctionAttribute");
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REQUESTDATAERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGet();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGet(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
index d3e9efb,0000000..1e22d81
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
@@@ -1,94 -1,0 +1,92 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author darrel
+ * @since 6.6.2
+ */
+public class GetPDXEnumByIdOp {
+ /**
+ * Get a enum from the given pool.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static EnumInfo execute(ExecutablePool pool,
+ int enumId)
+ {
+ AbstractOp op = new GetPDXEnumByIdOpImpl(enumId);
+ return (EnumInfo) pool.execute(op);
+ }
+
+ private GetPDXEnumByIdOp() {
+ // no instances allowed
+ }
+
+ private static class GetPDXEnumByIdOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public GetPDXEnumByIdOpImpl(int enumId) {
+ super(MessageType.GET_PDX_ENUM_BY_ID, 1);
+ getMessage().addIntPart(enumId);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ return processObjResponse(msg, "getPDXEnumById");
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGetPDXTypeById(); // reuse PDXType stats instead of adding new enum ones
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeByIdSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ //Don't send the transaction id for this message type.
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
+
- //TODO - no idea what this mumbo jumbo means, but it's on
- //most of the other messages like this.
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
index 0cb5d33,0000000..0590caf
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
@@@ -1,112 -1,0 +1,112 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Retrieve all known PDX types.
+ *
+ * @author bakera
+ * @since 7.0
+ */
+public class GetPDXEnumsOp {
+
+ public static Map<Integer, EnumInfo> execute(ExecutablePool pool) {
+ AbstractOp op = new GetPDXEnumsOpImpl();
+ return (Map<Integer, EnumInfo>) pool.execute(op);
+ }
+
+ private GetPDXEnumsOp() {
+ // no instances allowed
+ }
+
+ private static class GetPDXEnumsOpImpl extends AbstractOp {
+ public GetPDXEnumsOpImpl() {
+ super(MessageType.GET_PDX_ENUMS, 1);
+ getMessage().addIntPart(0); // must have at least one part
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ return (Map<Integer, EnumInfo>) part.getObject();
+
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote " + "getPdxEnums";
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return 0;
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
index 81a2b1b,0000000..bac2e80
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
@@@ -1,115 -1,0 +1,113 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author darrel
+ * @since 6.6.2
+ */
+public class GetPDXIdForEnumOp {
+ /**
+ * Register a bunch of instantiators on a server
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static int execute(ExecutablePool pool,
+ EnumInfo ei)
+ {
+ AbstractOp op = new GetPDXIdForEnumOpImpl(ei);
+ return ((Integer) pool.execute(op)).intValue();
+ }
+
+ private GetPDXIdForEnumOp() {
+ // no instances allowed
+ }
+
+ private static class GetPDXIdForEnumOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public GetPDXIdForEnumOpImpl(EnumInfo ei) {
+ super(MessageType.GET_PDX_ID_FOR_ENUM, 1);
+ getMessage().addObjPart(ei);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ return Integer.valueOf(part.getInt());
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote " + "getPdxIdForEnum";
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGetPDXTypeById();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeByIdSend(start, hasFailed()); /* reusing type stats instead of adding enum ones */
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ //Don't send the transaction id for this message type.
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
- //TODO - no idea what this mumbo jumbo means, but it's on
- //most of the other messages like this.
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
index d771cb6,0000000..1b71f71
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
@@@ -1,115 -1,0 +1,113 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author dsmith
+ * @since 6.6
+ */
+public class GetPDXIdForTypeOp {
+ /**
+ * Register a bunch of instantiators on a server
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static int execute(ExecutablePool pool,
+ PdxType type)
+ {
+ AbstractOp op = new GetPDXIdForTypeOpImpl(type);
+ return ((Integer) pool.execute(op)).intValue();
+ }
+
+ private GetPDXIdForTypeOp() {
+ // no instances allowed
+ }
+
+ private static class GetPDXIdForTypeOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public GetPDXIdForTypeOpImpl(PdxType type) {
+ super(MessageType.GET_PDX_ID_FOR_TYPE, 1);
+ getMessage().addObjPart(type);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ return Integer.valueOf(part.getInt());
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote " + "getPdxIdForType";
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGetPDXTypeById();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeByIdSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ //Don't send the transaction id for this message type.
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
- //TODO - no idea what this mumbo jumbo means, but it's on
- //most of the other messages like this.
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}