You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 01:00:07 UTC
[49/51] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
index e0b2810,0000000..1d9bd2d
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
@@@ -1,94 -1,0 +1,92 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author dsmith
+ * @since 6.6
+ */
+public class GetPDXTypeByIdOp {
+ /**
+ * Get a PdxType from the given pool.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static PdxType execute(ExecutablePool pool,
+ int pdxId)
+ {
+ AbstractOp op = new GetPDXTypeByIdOpImpl(pdxId);
+ return (PdxType) pool.execute(op);
+ }
+
+ private GetPDXTypeByIdOp() {
+ // no instances allowed
+ }
+
+ private static class GetPDXTypeByIdOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public GetPDXTypeByIdOpImpl(int pdxId) {
+ super(MessageType.GET_PDX_TYPE_BY_ID, 1);
+ getMessage().addIntPart(pdxId);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ return processObjResponse(msg, "getPDXTypeById");
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGetPDXTypeById();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeByIdSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ //Don't send the transaction id for this message type.
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
+
- //TODO - no idea what this mumbo jumbo means, but it's on
- //most of the other messages like this.
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
index 2990192,0000000..262cb9a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
@@@ -1,112 -1,0 +1,112 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Retrieve all known PDX types.
+ *
+ * @author bakera
+ * @since 7.0
+ */
+public class GetPDXTypesOp {
+
+ public static Map<Integer, PdxType> execute(ExecutablePool pool) {
+ AbstractOp op = new GetPDXTypesOpImpl();
+ return (Map<Integer, PdxType>) pool.execute(op);
+ }
+
+ private GetPDXTypesOp() {
+ // no instances allowed
+ }
+
+ private static class GetPDXTypesOpImpl extends AbstractOp {
+ public GetPDXTypesOpImpl() {
+ super(MessageType.GET_PDX_TYPES, 1);
+ getMessage().addIntPart(0); // must have at least one part
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ int msgType = msg.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ return (Map<Integer, PdxType>) part.getObject();
+
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote " + "getPdxTypes";
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+
+ } else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return 0;
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected boolean participateInTransaction() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
index 59b99f0,0000000..27d80b1
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
@@@ -1,91 -1,0 +1,91 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tell a server to become the primary host of a server-to-client queue
+ * @author darrel
+ * @since 5.7
+ */
+public class MakePrimaryOp {
+ /**
+ * Tell the given server to become the primary host of a server-to-client queue
+ * @param pool the pool to use to communicate with the server.
+ * @param conn the connection to do the execution on
+ * @param sentClientReady true if the client ready message has already been sent
+ */
+ public static void execute(ExecutablePool pool, Connection conn, boolean sentClientReady)
+ {
+ AbstractOp op = new MakePrimaryOpImpl(sentClientReady);
+ pool.executeOn(conn, op);
+ }
+
+ private MakePrimaryOp() {
+ // no instances allowed
+ }
+
+ private static class MakePrimaryOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public MakePrimaryOpImpl(boolean sentClientReady) {
+ super(MessageType.MAKE_PRIMARY, 1);
+ getMessage().addBytesPart(new byte[] {(byte)(sentClientReady?0x01:0x00)});
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "makePrimary");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startMakePrimary();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endMakePrimarySend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endMakePrimary(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
index e0bc81b,0000000..e70d50a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
@@@ -1,97 -1,0 +1,97 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Ping a server to see if it is still alive.
+ * @author darrel
+ * @since 5.7
+ */
+public class PingOp {
+ /**
+ * Ping the specified server to see if it is still alive
+ * @param pool the pool to use to communicate with the server.
+ * @param server the server to do the execution on
+ */
+ public static void execute(ExecutablePool pool, ServerLocation server)
+ {
+ AbstractOp op = new PingOpImpl();
+ pool.executeOn(server, op, false,false);
+ }
+
+ private PingOp() {
+ // no instances allowed
+ }
+
+ static class PingOpImpl extends AbstractOp {
+
+ private long startTime;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public PingOpImpl() {
+ super(MessageType.PING, 0);
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ Message.messageType.set(null);
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ startTime = System.currentTimeMillis();
+ getMessage().send(false);
+ Message.messageType.set(MessageType.PING);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "ping");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startPing();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endPingSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endPing(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
index 0b65c56,0000000..4ee680a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
@@@ -1,101 -1,0 +1,101 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Send the primary server acknowledgement on the events this client
+ * has received and processed from it.
+ * @author darrel
+ * @since 5.7
+ */
+public class PrimaryAckOp {
+ /**
+ * Send the primary server acknowledgement on the events this client
+ * has received and processed from it
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param connection
+ * @param pool the pool to use to communicate with the server.
+ * @param events list of events to acknowledge
+ */
+ public static void execute(Connection connection, ExecutablePool pool,
+ List events)
+ {
+ AbstractOp op = new PrimaryAckOpImpl(events);
+ pool.executeOn(connection, op);
+ }
+
+ private PrimaryAckOp() {
+ // no instances allowed
+ }
+
+ private static class PrimaryAckOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public PrimaryAckOpImpl(List events) {
+ super(MessageType.PERIODIC_ACK, events.size());
+ for (Iterator i = events.iterator(); i.hasNext();) {
+ getMessage().addObjPart(i.next());
+ }
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "primaryAck");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startPrimaryAck();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endPrimaryAckSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endPrimaryAck(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
index 53b8fa9,0000000..2747fa8
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
@@@ -1,124 -1,0 +1,124 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Properties;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+
+public class ProxyCacheCloseOp {
+
+ public static Object executeOn(ServerLocation location, ExecutablePool pool,
+ Properties securityProps, boolean keepAlive) {
+ AbstractOp op = new ProxyCacheCloseOpImpl(pool, securityProps, keepAlive);
+ return pool.executeOn(location, op);
+ }
+
+ private ProxyCacheCloseOp() {
+ // no instances allowed
+ }
+
+ static class ProxyCacheCloseOpImpl extends AbstractOp {
+
+ public ProxyCacheCloseOpImpl(ExecutablePool pool, Properties securityProps,
+ boolean keepAlive) {
+ super(MessageType.REMOVE_USER_AUTH, 1);
- getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
++ getMessage().setMessageHasSecurePartFlag();
+ getMessage().addBytesPart(keepAlive ? new byte[] {1} : new byte[] {0});
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ byte[] secureBytes = null;
+ hdos.writeLong(cnx.getConnectionID());
+ Object userId = UserAttributes.userAttributes.get().getServerToId().get(cnx.getServer());
+ if (userId == null) {
+ // This will ensure that this op is retried on another server, unless
+ // the retryCount is exhausted. Fix for Bug 41501
+ throw new ServerConnectivityException(
+ "Connection error while authenticating user");
+ }
+ hdos.writeLong((Long)userId);
+ try {
+ secureBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
+ hdos.toByteArray());
+ } finally {
+ hdos.close();
+ }
+ getMessage().setSecurePart(secureBytes);
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY) {
+ return part.getObject();
+ }
+ else if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote proxy cache close";
+ throw new ServerOperationException(s, (Throwable)part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ }
+ else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ }
+ else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REQUESTDATAERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGet();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGet(start, hasTimedOut(), hasFailed());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
index a003538,0000000..d2631fc
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
@@@ -1,91 -1,0 +1,91 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tells the server we are ready to receive server-to-client events
+ * from durable subscriptions.
+ * @author darrel
+ * @since 5.7
+ */
+public class ReadyForEventsOp {
+ /**
+ * Tells the primary server we are ready to receive server-to-client events
+ * from durable subscriptions.
+ * @param pool the pool to use to communicate with the server.
+ * @param primary
+ */
+ public static void execute(ExecutablePool pool, QueueConnectionImpl primary)
+ {
+ AbstractOp op = new ReadyForEventsOpImpl();
+ pool.executeOn(primary, op);
+ }
+
+ private ReadyForEventsOp() {
+ // no instances allowed
+ }
+
+ private static class ReadyForEventsOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public ReadyForEventsOpImpl() {
+ super(MessageType.CLIENT_READY, 1);
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "readyForEvents");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startReadyForEvents();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endReadyForEventsSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endReadyForEvents(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
index b2b975f,0000000..869ad64
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
@@@ -1,138 -1,0 +1,138 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
+import com.gemstone.gemfire.internal.cache.ClientServerObserver;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+public class RegisterDataSerializersOp {
+
+ public static void execute(ExecutablePool pool,
+ DataSerializer[] dataSerializers, EventID eventId) {
+ AbstractOp op = new RegisterDataSerializersOpImpl(dataSerializers,
+ eventId);
+ pool.execute(op);
+ }
+
+ public static void execute(ExecutablePool pool,
+ SerializerAttributesHolder[] holders, EventID eventId) {
+ AbstractOp op = new RegisterDataSerializersOpImpl(holders,
+ eventId);
+ pool.execute(op);
+ }
+
+ private RegisterDataSerializersOp() {
+ // no instances allowed
+ }
+
+ private static class RegisterDataSerializersOpImpl extends AbstractOp {
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RegisterDataSerializersOpImpl(DataSerializer[] dataSerializers,
+ EventID eventId) {
+ super(MessageType.REGISTER_DATASERIALIZERS, dataSerializers.length * 2 + 1);
+ for(int i = 0; i < dataSerializers.length; i++) {
+ DataSerializer dataSerializer = dataSerializers[i];
+ // strip '.class' off these class names
+ String className = dataSerializer.getClass().toString().substring(6);
+ try {
+ getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(dataSerializer.getId());
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+ // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ ClientServerObserver bo = ClientServerObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ /**
+ * @throws SerializationException
+ * Thrown when serialization fails.
+ */
+ public RegisterDataSerializersOpImpl(SerializerAttributesHolder[] holders,
+ EventID eventId) {
+ super(MessageType.REGISTER_DATASERIALIZERS, holders.length * 2 + 1);
+ for (int i = 0; i < holders.length; i++) {
+ try {
+ getMessage().addBytesPart(
+ BlobHelper.serializeToBlob(holders[i].getClassName()));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(holders[i].getId());
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+ // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ ClientServerObserver bo = ClientServerObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "registerDataSerializers");
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRegisterDataSerializers();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterDataSerializersSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterDataSerializers(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
index 93d3756,0000000..0d5a137
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
@@@ -1,180 -1,0 +1,180 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.Instantiator;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
+import com.gemstone.gemfire.internal.cache.ClientServerObserver;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+/**
+ * Register a bunch of instantiators on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInstantiatorsOp {
+ /**
+ * Register a bunch of instantiators on a server
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param instantiators the instantiators to register
+ * @param eventId the id of this event
+ */
+ public static void execute(ExecutablePool pool,
+ Instantiator[] instantiators,
+ EventID eventId)
+ {
+ AbstractOp op = new RegisterInstantiatorsOpImpl(instantiators, eventId);
+ pool.execute(op, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Register a bunch of instantiators on a server using connections from the
+ * given pool to communicate with the server.
+ *
+ * @param pool
+ * the pool to use to communicate with the server.
+ * @param holders
+ * the {@link InstantiatorAttributesHolder}s containing info about
+ * the instantiators to register
+ * @param eventId
+ * the id of this event
+ */
+ public static void execute(ExecutablePool pool,
+ Object[] holders, EventID eventId) {
+ AbstractOp op = new RegisterInstantiatorsOpImpl(holders,
+ eventId);
+ pool.execute(op, Integer.MAX_VALUE);
+ }
+
+ private RegisterInstantiatorsOp() {
+ // no instances allowed
+ }
+
+ private static class RegisterInstantiatorsOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RegisterInstantiatorsOpImpl(Instantiator[] instantiators,
+ EventID eventId) {
+ super(MessageType.REGISTER_INSTANTIATORS, instantiators.length * 3 + 1);
+ for(int i = 0; i < instantiators.length; i++) {
+ Instantiator instantiator = instantiators[i];
+ // strip '.class' off these class names
+ String className = instantiator.getClass().toString().substring(6);
+ String instantiatedClassName = instantiator.getInstantiatedClass().toString().substring(6);
+ try {
+ getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
+ getMessage().addBytesPart(BlobHelper.serializeToBlob(instantiatedClassName));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(instantiator.getId());
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+// // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ ClientServerObserver bo = ClientServerObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException
+ * if serialization fails
+ */
+ public RegisterInstantiatorsOpImpl(Object[] holders,
+ EventID eventId) {
+ super(MessageType.REGISTER_INSTANTIATORS, holders.length * 3 + 1);
+ for (Object obj : holders) {
+ String instantiatorClassName = null;
+ String instantiatedClassName = null;
+ int id = 0;
+ if (obj instanceof Instantiator) {
+ instantiatorClassName = ((Instantiator)obj).getClass().getName();
+ instantiatedClassName = ((Instantiator)obj).getInstantiatedClass()
+ .getName();
+ id = ((Instantiator)obj).getId();
+ } else {
+ instantiatorClassName = ((InstantiatorAttributesHolder)obj)
+ .getInstantiatorClassName();
+ instantiatedClassName = ((InstantiatorAttributesHolder)obj)
+ .getInstantiatedClassName();
+ id = ((InstantiatorAttributesHolder)obj).getId();
+ }
+ try {
+ getMessage().addBytesPart(
+ BlobHelper.serializeToBlob(instantiatorClassName));
+ getMessage().addBytesPart(
+ BlobHelper.serializeToBlob(instantiatedClassName));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(id);
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+ // // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ ClientServerObserver bo = ClientServerObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "registerInstantiators");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRegisterInstantiators();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterInstantiatorsSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterInstantiators(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
index 2793f32,0000000..6f01b96
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
@@@ -1,99 -1,0 +1,99 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a Rollback on the server
+ * @since 6.6
+ * @author sbawaska
+ */
+public class RollbackOp {
+
+ /**
+ * Does a rollback on the server for given transaction
+ * @param pool the pool to use to communicate with the server.
+ * @param txId the id of the transaction to rollback
+ */
+ public static void execute(ExecutablePool pool, int txId) {
+ RollbackOpImpl op = new RollbackOpImpl(txId);
+ pool.execute(op);
+ }
+
+ private RollbackOp() {
+ // no instance allowed
+ }
+
+ private static class RollbackOpImpl extends AbstractOp {
+ private int txId;
+
+ protected RollbackOpImpl(int txId) {
+ super(MessageType.ROLLBACK, 1);
+ getMessage().setTransactionId(txId);
+ this.txId = txId;
+ }
+
+ @Override
+ public String toString() {
+ return "Rollback(txId="+this.txId+")";
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "rollback");
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXCEPTION;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRollback();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRollbackSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRollback(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
index 6d69083,0000000..42cc225
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
@@@ -1,92 -1,0 +1,92 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a region size on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class SizeOp {
+ /**
+ * Does a region size on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the entry keySet on
+ */
+ public static Integer execute(InternalPool pool,
+ String region)
+ {
+ AbstractOp op = new SizeOpImpl(region);
+ return (Integer)pool.execute(op);
+ }
+
+ private SizeOp() {
+ // no instances allowed
+ }
+
+ private static class SizeOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public SizeOpImpl(String region) {
+ super(MessageType.SIZE, 1);
+ getMessage().addStringPart(region);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+
+ return processObjResponse(msg, "size");
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.SIZE_ERROR;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startSize();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endSizeSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endSize(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
index 1fecc7d,0000000..64ee66e
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
@@@ -1,93 -1,0 +1,93 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Indicates to the server that a transaction is
+ * failing over to this server. The server then
+ * performs the necessary bootstrapping for the tx.
+ * @author sbawaska
+ * @since 6.6
+ */
+public class TXFailoverOp {
+
+ public static void execute(ExecutablePool pool, int txId) {
+ pool.execute(new TXFailoverOpImpl(txId));
+ }
+
+ private TXFailoverOp() {
+ // no instance
+ }
+
+ private static class TXFailoverOpImpl extends AbstractOp {
+ int txId;
+
+ protected TXFailoverOpImpl(int txId) {
+ super(MessageType.TX_FAILOVER, 1);
+ getMessage().setTransactionId(txId);
+ this.txId = txId;
+ }
+
+ @Override
+ public String toString() {
+ return "TXFailoverOp(txId="+this.txId+")";
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "txFailover");
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXCEPTION;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startTxFailover();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endTxFailoverSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endTxFailover(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
index 48d66f2,0000000..34ecf4d
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
@@@ -1,163 -1,0 +1,163 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+
+/**
+ * TXSynchronizationOp sends JTA beforeCompletion and afterCompletion
+ * messages to the server pool.
+ *
+ * @author bruce
+ *
+ */
+public class TXSynchronizationOp {
+
+ public static enum CompletionType {
+ BEFORE_COMPLETION, AFTER_COMPLETION
+ }
+
+ /**
+ * @param pool
+ * @param status - the status of an afterCompletion notification
+ * @param txId - the transaction identifier
+ * @param type - BEFORE_COMPLETION or AFTER_COMPLETION
+ * @return the server's commit message
+ */
+ public static TXCommitMessage execute(InternalPool pool, int status, int txId, CompletionType type) {
+ Impl impl = new Impl(status, txId, type);
+ pool.execute(impl);
+ return impl.tXCommitMessageResponse;
+ }
+
+ static class Impl extends AbstractOp {
+
+ private int status;
+ private CompletionType type;
+ TXCommitMessage tXCommitMessageResponse;
+
+ /**
+ * @param status
+ * @param type
+ */
+ public Impl(int status, int txId, CompletionType type) {
+ super(MessageType.TX_SYNCHRONIZATION, (type==CompletionType.AFTER_COMPLETION)? 3 : 2);
+ this.status = status;
+ this.type = type;
+ getMessage().addIntPart(type.ordinal());
+ getMessage().addIntPart(txId);
+ if (type == CompletionType.AFTER_COMPLETION) {
+ getMessage().addIntPart(status);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "TXSynchronization(threadTxId=" + TXManagerImpl.getCurrentTXUniqueId()
+ +"; "+this.type + "; status=" + this.status + ")";
+ }
+
+ @Override
+ protected void processAck(Message msg, String opName)
+ throws Exception
+ {
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY) {
+ return;
+ } else {
+ Part part = msg.getPart(0);
+ if (msgType == MessageType.EXCEPTION) {
+ Throwable t = (Throwable) part.getObject();
+ if (t instanceof CommitConflictException ||
+ t instanceof SynchronizationCommitConflictException) {
+ throw (GemFireException)t;
+ }
+ }
+ super.processAck(msg, opName);
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.AbstractOp#processResponse(com.gemstone.gemfire.internal.cache.tier.sockets.Message)
+ */
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ if (this.type == CompletionType.BEFORE_COMPLETION) {
+ try {
+ processAck(msg, type.toString());
+ } catch (ServerOperationException e) {
+ if (e.getCause() instanceof SynchronizationCommitConflictException) {
+ throw (SynchronizationCommitConflictException)e.getCause();
+ }
+ }
+ return null;
+ } else {
+ TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, this.type.toString());
+ this.tXCommitMessageResponse = rcs;
+ return rcs;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.AbstractOp#isErrorResponse(int)
+ */
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REQUESTDATAERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startTxSynchronization();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endTxSynchronizationSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endTxSynchronization(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
- getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
++ getMessage().clearMessageHasSecurePartFlag();
+ getMessage().send(false);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
index 636dd91,0000000..d2fdbe7
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
@@@ -1,357 -1,0 +1,357 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+
++import com.gemstone.gemfire.internal.hll.ICardinality;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
- import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract class for {@link Hoplog} with common functionality
+ */
+public abstract class AbstractHoplog implements Hoplog {
+ protected final FSProvider fsProvider;
+
+ // path of the oplog file
+ protected volatile Path path;
+ private volatile HoplogDescriptor hfd;
+ protected Configuration conf;
+ protected SortedOplogStatistics stats;
+ protected Long hoplogModificationTime;
+ protected Long hoplogSize;
+
+ protected HoplogReaderActivityListener readerListener;
+
+ // logger instance
+ protected static final Logger logger = LogService.getLogger();
+
+ protected static String logPrefix;
+ // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
+ AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
+ throws IOException {
+ logPrefix = "<" + filePath.getName() + "> ";
+ this.fsProvider = new FSProvider(inputFS);
+ initialize(filePath, stats, inputFS);
+ }
+
+ public AbstractHoplog(HDFSStoreImpl store, Path filePath,
+ SortedOplogStatistics stats) throws IOException {
+ logPrefix = "<" + filePath.getName() + "> ";
+ this.fsProvider = new FSProvider(store);
+ initialize(filePath, stats, store.getFileSystem());
+ }
+
+ private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
+ this.conf = fs.getConf();
+ this.stats = stats;
+ this.path = fs.makeQualified(path);
+ this.hfd = new HoplogDescriptor(this.path.getName());
+ }
+
+ @Override
+ public abstract void close() throws IOException;
+ @Override
+ public abstract HoplogReader getReader() throws IOException;
+
+ @Override
+ public abstract HoplogWriter createWriter(int keys) throws IOException;
+
+ @Override
+ abstract public void close(boolean clearCache) throws IOException;
+
+ @Override
+ public void setReaderActivityListener(HoplogReaderActivityListener listener) {
+ this.readerListener = listener;
+ }
+
+ @Override
+ public String getFileName() {
+ return this.hfd.getFileName();
+ }
+
+ public final int compareTo(Hoplog o) {
+ return hfd.compareTo( ((AbstractHoplog)o).hfd);
+ }
+
+ @Override
+ public ICardinality getEntryCountEstimate() throws IOException {
+ return null;
+ }
+
+ @Override
+ public synchronized void rename(String name) throws IOException {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Renaming hoplog to " + name, logPrefix);
+ Path parent = path.getParent();
+ Path newPath = new Path(parent, name);
+ fsProvider.getFS().rename(path, new Path(parent, newPath));
+
+ // close the old reader and let the new one get created lazily
+ close();
+
+ // update path to point to the new path
+ path = newPath;
+ this.hfd = new HoplogDescriptor(this.path.getName());
+ logPrefix = "<" + path.getName() + "> ";
+ }
+
+ @Override
+ public synchronized void delete() throws IOException {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Deleting hoplog", logPrefix);
+ close();
+ this.hoplogModificationTime = null;
+ this.hoplogSize = null;
+ fsProvider.getFS().delete(path, false);
+ }
+
+ @Override
+ public long getModificationTimeStamp() {
+ initHoplogSizeTimeInfo();
+
+ // modification time will not be null if this hoplog is existing. Otherwise
+ // invocation of this method should is invalid
+ if (hoplogModificationTime == null) {
+ throw new IllegalStateException();
+ }
+
+ return hoplogModificationTime;
+ }
+
+ @Override
+ public long getSize() {
+ initHoplogSizeTimeInfo();
+
+ // size will not be null if this hoplog is existing. Otherwise
+ // invocation of this method should is invalid
+ if (hoplogSize == null) {
+ throw new IllegalStateException();
+ }
+
+ return hoplogSize;
+ }
+
+ private synchronized void initHoplogSizeTimeInfo() {
+ if (hoplogSize != null && hoplogModificationTime != null) {
+ // time and size info is already initialized. no work needed here
+ return;
+ }
+
+ try {
+ FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
+ if (filesInfo != null && filesInfo.length == 1) {
+ this.hoplogModificationTime = filesInfo[0].getModificationTime();
+ this.hoplogSize = filesInfo[0].getLen();
+ }
+ // TODO else condition may happen if user deletes hoplog from the file system.
+ } catch (IOException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
+ throw new HDFSIOException(
+ LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
+ }
+ }
+ public static SequenceFile.Writer getSequenceFileWriter(Path path,
+ Configuration conf, Logger logger) throws IOException {
+ return getSequenceFileWriter(path,conf, logger, null);
+ }
+
+ /**
+ *
+ * @param path
+ * @param conf
+ * @param logger
+ * @param version - is being used only for testing. Should be passed as null for other purposes.
+ * @return SequenceFile.Writer
+ * @throws IOException
+ */
+ public static SequenceFile.Writer getSequenceFileWriter(Path path,
+ Configuration conf, Logger logger, Version version) throws IOException {
+ Option optPath = SequenceFile.Writer.file(path);
+ Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
+ Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
+ Option optCom = withCompression(logger);
+ if (logger.isDebugEnabled())
+ logger.debug("{}Started creating hoplog " + path, logPrefix);
+
+ if (version == null)
+ version = Version.CURRENT;
+ //Create a metadata option with the gemfire version, for future versioning
+ //of the key and value format
+ SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+ metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
+ Option optMeta = SequenceFile.Writer.metadata(metadata);
+
+ SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
+
+ return writer;
+ }
+
+ private static Option withCompression(Logger logger) {
+ String prop = System.getProperty(HoplogConfig.COMPRESSION);
+ if (prop != null) {
+ CompressionCodec codec;
+ if (prop.equalsIgnoreCase("SNAPPY")) {
+ codec = new SnappyCodec();
+ } else if (prop.equalsIgnoreCase("LZ4")) {
+ codec = new Lz4Codec();
+ } else if (prop.equals("GZ")) {
+ codec = new GzipCodec();
+ } else {
+ throw new IllegalStateException("Unsupported codec: " + prop);
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("{}Using compression codec " + codec, logPrefix);
+ return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
+ }
+ return SequenceFile.Writer.compression(CompressionType.NONE, null);
+ }
+
+ public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
+ private final String fileName;
+ private final String bucket;
+ private final int sequence;
+ private final long timestamp;
+ private final String extension;
+
+ HoplogDescriptor(final String fileName) {
+ this.fileName = fileName;
+ final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
+ final boolean matched = matcher.find();
+ assert matched;
+ this.bucket = matcher.group(1);
+ this.sequence = Integer.valueOf(matcher.group(3));
+ this.timestamp = Long.valueOf(matcher.group(2));
+ this.extension = matcher.group(4);
+ }
+
+ public final String getFileName() {
+ return fileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof HoplogDescriptor)) {
+ return false;
+ }
+
+ final HoplogDescriptor other = (HoplogDescriptor)o;
+ // the two files should belong to same bucket
+ assert this.bucket.equals(other.bucket);
+
+ // compare sequence first
+ if (this.sequence != other.sequence) {
+ return false;
+ }
+
+ // sequence is same, compare timestamps
+ if (this.timestamp != other.timestamp) {
+ return false;
+ }
+
+ return extension.equals(other.extension);
+ }
+
+ @Override
+ public int compareTo(HoplogDescriptor o) {
+ if (this == o) {
+ return 0;
+ }
+
+ // the two files should belong to same bucket
+ assert this.bucket.equals(o.bucket);
+
+ // compare sequence first
+ if (sequence > o.sequence) {
+ return -1;
+ } else if (sequence < o.sequence) {
+ return 1;
+ }
+
+ // sequence is same, compare timestamps
+ if(timestamp > o.timestamp) {
+ return -1;
+ } else if (timestamp < o.timestamp) {
+ return 1;
+ }
+
+ //timestamp is the same, compare the file extension. It's
+ //possible a major compaction and minor compaction could finish
+ //at the same time and create the same timestamp and sequence number
+ //it doesn't matter which file we look at first in that case.
+ return extension.compareTo(o.extension);
+ }
+
+
+ }
+
+ protected static final class FSProvider {
+ final FileSystem fs;
+ final HDFSStoreImpl store;
+
+ // THIS METHOD IS FOR TESTING ONLY
+ FSProvider(FileSystem fs) {
+ this.fs = fs;
+ this.store = null;
+ }
+
+ FSProvider(HDFSStoreImpl store) {
+ this.store = store;
+ fs = null;
+ }
+
+ public FileSystem getFS() throws IOException {
+ if (store != null) {
+ return store.getFileSystem();
+ }
+ return fs;
+ }
+
+ public FileSystem checkFileSystem() {
+ store.checkAndClearFileSystem();
+ return store.getCachedFileSystem();
+ }
+ }
+}