You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:15 UTC
[027/100] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
index 6d9ca49,0000000..018d05b
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
@@@ -1,436 -1,0 +1,435 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionHelper;
- import com.gemstone.gemfire.cache.client.internal.Op;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.operations.ExecuteFunctionOperationContext;
- import com.gemstone.gemfire.i18n.LogWriterI18n;
++import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.DistributedRegion;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
+import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender65;
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+
+/**
+ *
+ * @author kbachhav
+ * @since 6.6
+ */
+public class ExecuteRegionFunction66 extends BaseCommand {
+
+ private final static ExecuteRegionFunction66 singleton = new ExecuteRegionFunction66();
+
+ public static Command getCommand() {
+ return singleton;
+ }
+
+ private ExecuteRegionFunction66() {
+ }
+
+ @Override
+ public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ throws IOException {
+ String regionName = null;
+ Object function = null;
+ Object args = null;
+ MemberMappedArgument memberMappedArg = null;
+ final boolean isBucketsAsFilter ;
+ final byte isReExecute ;
+ Set<Object> filter = null;
+ byte hasResult = 0;
+ int removedNodesSize = 0;
+ Set<Object> removedNodesSet = null;
+ int filterSize = 0, partNumber = 0;
+ CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ byte functionState = 0;
+ int functionTimeout = GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
+ try {
+ byte[] bytes = msg.getPart(0).getSerializedForm();
+ functionState = bytes[0];
+ if (bytes.length >= 5 && servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) {
+ functionTimeout = Part.decodeInt(bytes, 1);
+ }
+ if (functionState != 1) {
+ hasResult = (byte)((functionState & 2) - 1);
+ }
+ else {
+ hasResult = functionState;
+ }
+ if (hasResult == 1) {
+ servConn.setAsTrue(REQUIRES_RESPONSE);
+ servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+ }
+ regionName = msg.getPart(1).getString();
+ function = msg.getPart(2).getStringOrObject();
+ args = msg.getPart(3).getObject();
+ Part part = msg.getPart(4);
+ if (part != null) {
+ Object obj = part.getObject();
+ if (obj instanceof MemberMappedArgument) {
+ memberMappedArg = (MemberMappedArgument)obj;
+ }
+ }
+ byte[] flags =msg.getPart(5).getSerializedForm();
+ if(servConn.getClientVersion().ordinal() > Version.GFE_81.ordinal()) {
+ isBucketsAsFilter = (flags[0] & ExecuteFunctionHelper.BUCKETS_AS_FILTER_MASK) != 0 ;
+ isReExecute = (flags[0] & ExecuteFunctionHelper.IS_REXECUTE_MASK) != 0 ? (byte)1 : 0;
+ }else {
+ isReExecute = flags[0];
+ isBucketsAsFilter = false;
+ }
+ filterSize = msg.getPart(6).getInt();
+ if (filterSize != 0) {
+ filter = new HashSet<Object>();
+ partNumber = 7;
+ for (int i = 0; i < filterSize; i++) {
+ filter.add(msg.getPart(partNumber + i).getStringOrObject());
+ }
+ }
+
+ partNumber = 7 + filterSize;
+ removedNodesSize = msg.getPart(partNumber).getInt();
+
+ if (removedNodesSize != 0) {
+ removedNodesSet = new HashSet<Object>();
+ partNumber = partNumber + 1;
+
+ for (int i = 0; i < removedNodesSize; i++) {
+ removedNodesSet.add(msg.getPart(partNumber + i).getStringOrObject());
+ }
+ }
+
+ }
+ catch (ClassNotFoundException exception) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception);
+ if (hasResult == 1) {
+ writeChunkedException(msg, exception, false, servConn);
+ }
+ else {
+ writeException(msg, exception, false, servConn);
+ }
+ servConn.setAsTrue(RESPONDED);
+ return;
+ }
+ if (function == null || regionName == null) {
+ String message = null;
+ if (function == null) {
+ message = LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
+ .toLocalizedString("function");
+ }
+ if (regionName == null) {
+ message = LocalizedStrings.ExecuteRegionFunction_THE_INPUT_0_FOR_THE_EXECUTE_FUNCTION_REQUEST_IS_NULL
+ .toLocalizedString("region");
+ }
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ }
+ else {
+ Region region = crHelper.getRegion(regionName);
+ if (region == null) {
+ String message = LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST
+ .toLocalizedString(regionName);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ }
+ HandShake handShake = (HandShake)servConn.getHandshake();
+ int earlierClientReadTimeout = handShake.getClientReadTimeout();
+ handShake.setClientReadTimeout(functionTimeout);
+ ServerToClientFunctionResultSender resultSender = null;
+ Function functionObject = null;
+ try {
+ if (function instanceof String) {
+ functionObject = FunctionService.getFunction((String)function);
+ if (functionObject == null) {
+ String message = LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED
+ .toLocalizedString(function);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ }
+ else {
+ byte functionStateOnServerSide = AbstractExecution
+ .getFunctionState(functionObject.isHA(), functionObject
+ .hasResult(), functionObject.optimizeForWrite());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Function State on server side: {} on client: {}", functionStateOnServerSide, functionState);
+ }
+ if (functionStateOnServerSide != functionState) {
+ String message = LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER
+ .toLocalizedString(function);
+ logger.warn("{}: {}", servConn.getName(), message);
+ sendError(hasResult, msg, message, servConn);
+ return;
+ }
+ }
+ }
+ else {
+ functionObject = (Function)function;
+ }
+ // check if the caller is authorized to do this operation on server
+ AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ final String functionName = functionObject.getId();
+ final String regionPath = region.getFullPath();
+ ExecuteFunctionOperationContext executeContext = null;
+ if (authzRequest != null) {
+ executeContext = authzRequest.executeFunctionAuthorize(functionName,
+ regionPath, filter, args, functionObject.optimizeForWrite());
+ }
+
+ // Construct execution
+ AbstractExecution execution = (AbstractExecution)FunctionService
+ .onRegion(region);
+ ChunkedMessage m = servConn.getFunctionResponseMessage();
+ m.setTransactionId(msg.getTransactionId());
+ resultSender = new ServerToClientFunctionResultSender65(m,
+ MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn,
+ functionObject, executeContext);
+
+ if (execution instanceof PartitionedRegionFunctionExecutor) {
+ if((hasResult == 1) && filter!= null &&filter.size() == 1) {
+ ServerConnection.executeFunctionOnLocalNodeOnly((byte)1);
+ }
+ execution = new PartitionedRegionFunctionExecutor(
+ (PartitionedRegion)region, filter, args, memberMappedArg,
+ resultSender, removedNodesSet, isBucketsAsFilter);
+ }
+ else {
+ execution = new DistributedRegionFunctionExecutor(
+ (DistributedRegion)region, filter, args, memberMappedArg,
+ resultSender);
+ }
+ if (isReExecute == 1) {
+ execution = execution.setIsReExecute();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing Function: {} on Server: {} with Execution: {} functionState={} reExecute={} hasResult={}", functionObject.getId(), servConn, execution, functionState, isReExecute, hasResult);
+ }
+ if (hasResult == 1) {
+ if (function instanceof String) {
+ switch (functionState) {
+ case AbstractExecution.NO_HA_HASRESULT_NO_OPTIMIZEFORWRITE:
+ execution.execute((String)function, true, false, false)
+ .getResult();
+ break;
+ case AbstractExecution.HA_HASRESULT_NO_OPTIMIZEFORWRITE:
+ execution.execute((String)function, true, true, false)
+ .getResult();
+ break;
+ case AbstractExecution.HA_HASRESULT_OPTIMIZEFORWRITE:
+ execution.execute((String)function, true, true, true)
+ .getResult();
+ break;
+ case AbstractExecution.NO_HA_HASRESULT_OPTIMIZEFORWRITE:
+ execution.execute((String)function, true, false, true)
+ .getResult();
+ break;
+ }
+ }
+ else {
+ execution.execute(functionObject).getResult();
+ }
+ }
+ else {
+ if (function instanceof String) {
+ switch (functionState) {
+ case AbstractExecution.NO_HA_NO_HASRESULT_NO_OPTIMIZEFORWRITE:
+ execution.execute((String)function, false, false, false);
+ break;
+ case AbstractExecution.NO_HA_NO_HASRESULT_OPTIMIZEFORWRITE:
+ execution.execute((String)function, false, false, true);
+ break;
+ }
+ }
+ else {
+ execution.execute(functionObject);
+ }
+ writeReply(msg, servConn);
+ }
+ }
+ catch (IOException ioe) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), ioe);
+ final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY
+ .toLocalizedString();
+ sendException(hasResult, msg, message, servConn, ioe);
+ }
+ catch (FunctionException fe) {
+ String message = fe.getMessage();
-
- if (fe.getCause() instanceof FunctionInvocationTargetException) {
- if (fe.getCause() instanceof InternalFunctionInvocationTargetException) {
++ Object cause = fe.getCause();
++ if (cause instanceof FunctionInvocationTargetException || cause instanceof QueryInvocationTargetException) {
++ if (cause instanceof InternalFunctionInvocationTargetException) {
+ // Fix for #44709: User should not be aware of
+ // InternalFunctionInvocationTargetException. No instance of
+ // InternalFunctionInvocationTargetException is giving useful
+ // information to user to take any corrective action hence logging
+ // this at fine level logging
+ // 1> When bucket is moved
+ // 2> Incase of HA FucntionInvocationTargetException thrown. Since
+ // it is HA, fucntion will be reexecuted on right node
+ // 3> Multiple target nodes found for single hop operation
+ // 4> in case of HA member departed
+ if (logger.isDebugEnabled()) {
+ logger.debug(LocalizedMessage.create(LocalizedStrings.ExecuteFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, new Object[] { function }), fe);
+ }
+ }
+ else if (functionObject.isHA()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function + " :" + message));
+ }
+ else {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe);
+ }
+
+ resultSender.setException(fe);
+ }
+ else {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe);
+ sendException(hasResult, msg, message, servConn, fe);
+ }
+
+ }
+ catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e);
+ String message = e.getMessage();
+ sendException(hasResult, msg, message, servConn, e);
+ }
+
+ finally {
+ handShake.setClientReadTimeout(earlierClientReadTimeout);
+ ServerConnection.executeFunctionOnLocalNodeOnly((byte)0);
+ }
+ }
+ }
+
+ private void sendException(byte hasResult, Message msg, String message,
+ ServerConnection servConn, Throwable e) throws IOException {
+ synchronized (msg) {
+ if (hasResult == 1) {
+ writeFunctionResponseException(msg, MessageType.EXCEPTION, message,
+ servConn, e);
+ }
+ else {
+ writeException(msg, e, false, servConn);
+ }
+ servConn.setAsTrue(RESPONDED);
+ }
+ }
+
+ private void sendError(byte hasResult, Message msg, String message,
+ ServerConnection servConn) throws IOException {
+ synchronized (msg) {
+ if (hasResult == 1) {
+ writeFunctionResponseError(msg,
+ MessageType.EXECUTE_REGION_FUNCTION_ERROR, message, servConn);
+ }
+ else {
+ writeErrorResponse(msg, MessageType.EXECUTE_REGION_FUNCTION_ERROR,
+ message, servConn);
+ }
+ servConn.setAsTrue(RESPONDED);
+ }
+ }
+
+ protected static void writeFunctionResponseException(Message origMsg,
+ int messageType, String message, ServerConnection servConn, Throwable e)
+ throws IOException {
+ ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
+ ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ int numParts = 0;
+ if (functionResponseMsg.headerHasBeenSent()) {
+ if (e instanceof FunctionException
+ && e.getCause() instanceof InternalFunctionInvocationTargetException) {
+ functionResponseMsg.setNumberOfParts(3);
+ functionResponseMsg.addObjPart(e);
+ functionResponseMsg.addStringPart(BaseCommand.getExceptionTrace(e));
+ InternalFunctionInvocationTargetException fe = (InternalFunctionInvocationTargetException)e
+ .getCause();
+ functionResponseMsg.addObjPart(fe.getFailedNodeSet());
+ numParts = 3;
+ }
+ else {
+ functionResponseMsg.setNumberOfParts(2);
+ functionResponseMsg.addObjPart(e);
+ functionResponseMsg.addStringPart(BaseCommand.getExceptionTrace(e));
+ numParts = 2;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk while reply in progress: ", servConn.getName(), e);
+ }
+ functionResponseMsg.setServerConnection(servConn);
+ functionResponseMsg.setLastChunkAndNumParts(true, numParts);
+ // functionResponseMsg.setLastChunk(true);
+ functionResponseMsg.sendChunk(servConn);
+ }
+ else {
+ chunkedResponseMsg.setMessageType(messageType);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+ if (e instanceof FunctionException
+ && e.getCause() instanceof InternalFunctionInvocationTargetException) {
+ chunkedResponseMsg.setNumberOfParts(3);
+ chunkedResponseMsg.addObjPart(e);
+ chunkedResponseMsg.addStringPart(BaseCommand.getExceptionTrace(e));
+ InternalFunctionInvocationTargetException fe = (InternalFunctionInvocationTargetException)e
+ .getCause();
+ chunkedResponseMsg.addObjPart(fe.getFailedNodeSet());
+ numParts = 3;
+ }
+ else {
+ chunkedResponseMsg.setNumberOfParts(2);
+ chunkedResponseMsg.addObjPart(e);
+ chunkedResponseMsg.addStringPart(BaseCommand.getExceptionTrace(e));
+ numParts = 2;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk: ", servConn.getName(), e);
+ }
+ chunkedResponseMsg.setServerConnection(servConn);
+ chunkedResponseMsg.setLastChunkAndNumParts(true, numParts);
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 60afc14,0000000..77f9596
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@@ -1,222 -1,0 +1,223 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.xmlcache;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+
+public class AsyncEventQueueCreation implements AsyncEventQueue {
+
+ private String id = null;
+ private List<GatewayEventFilter> gatewayEventFilters = new ArrayList<GatewayEventFilter>();
+ private GatewayEventSubstitutionFilter gatewayEventSubstitutionFilter = null;
+ private AsyncEventListener asyncEventListener = null;
+ private int batchSize = 0;
+ private int batchTimeInterval = 0;
+ private boolean isBatchConflationEnabled = false;
+ private boolean isPersistent = false;
+ private String diskStoreName = null;
+ private boolean isDiskSynchronous = false;
+ private int maxQueueMemory = 0;
+ private boolean isParallel = false;
+ private boolean isBucketSorted = false;
+ private boolean isHDFSQueue = false;
+ private int dispatcherThreads = 1;
+ private OrderPolicy orderPolicy = OrderPolicy.KEY;
+
+ public AsyncEventQueueCreation() {
+ }
+
+ public AsyncEventQueueCreation(String id, GatewaySenderAttributes senderAttrs, AsyncEventListener eventListener) {
+ this.id = id;
+ this.batchSize = senderAttrs.batchSize;
+ this.batchTimeInterval = senderAttrs.batchTimeInterval;
+ this.isBatchConflationEnabled = senderAttrs.isBatchConflationEnabled;
+ this.isPersistent = senderAttrs.isPersistenceEnabled;
+ this.diskStoreName = senderAttrs.diskStoreName;
+ this.isDiskSynchronous = senderAttrs.isDiskSynchronous;
+ this.maxQueueMemory = senderAttrs.maximumQueueMemory;
+ this.isParallel = senderAttrs.isParallel;
+ this.dispatcherThreads = senderAttrs.dispatcherThreads;
+ this.orderPolicy = senderAttrs.policy;
+ this.asyncEventListener = eventListener;
+ this.isBucketSorted = senderAttrs.isBucketSorted;
+ this.isHDFSQueue = senderAttrs.isHDFSQueue;
++ this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
+ }
+
+ @Override
+ public AsyncEventListener getAsyncEventListener() {
+ return this.asyncEventListener;
+ }
+
+ public void setAsyncEventListener(AsyncEventListener eventListener) {
+ this.asyncEventListener = eventListener;
+ }
+
+ public void addGatewayEventFilter(
+ GatewayEventFilter filter) {
+ this.gatewayEventFilters.add(filter);
+ }
+
+ public List<GatewayEventFilter> getGatewayEventFilters() {
+ return this.gatewayEventFilters;
+ }
+
+ public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
+ return this.gatewayEventSubstitutionFilter;
+ }
+
+ public void setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter) {
+ this.gatewayEventSubstitutionFilter = filter;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return this.batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public int getBatchTimeInterval() {
+ return this.batchTimeInterval;
+ }
+
+ public void setBatchTimeInterval(int batchTimeInterval) {
+ this.batchTimeInterval = batchTimeInterval;
+ }
+
+ @Override
+ public boolean isBatchConflationEnabled() {
+ return this.isBatchConflationEnabled;
+ }
+
+ public void setBatchConflationEnabled(boolean batchConflationEnabled) {
+ this.isBatchConflationEnabled = batchConflationEnabled;
+ }
+
+ @Override
+ public String getDiskStoreName() {
+ return this.diskStoreName;
+ }
+
+ public void setDiskStoreName(String diskStore) {
+ this.diskStoreName = diskStore;
+ }
+
+ @Override
+ public boolean isDiskSynchronous() {
+ return this.isDiskSynchronous;
+ }
+
+ public void setDiskSynchronous(boolean diskSynchronous) {
+ this.isDiskSynchronous = diskSynchronous;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public int getMaximumQueueMemory() {
+ return this.maxQueueMemory;
+ }
+
+ public void setMaximumQueueMemory(int maxQueueMemory) {
+ this.maxQueueMemory = maxQueueMemory;
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return this.isPersistent;
+ }
+
+ public void setPersistent(boolean isPersistent) {
+ this.isPersistent = isPersistent;
+ }
+
+ public void setParallel(boolean isParallel) {
+ this.isParallel = isParallel;
+ }
+
+ @Override
+ public int getDispatcherThreads() {
+ return this.dispatcherThreads;
+ }
+
+ public void setDispatcherThreads(int numThreads) {
+ this.dispatcherThreads = numThreads;
+ }
+
+ @Override
+ public OrderPolicy getOrderPolicy() {
+ return this.orderPolicy;
+ }
+
+ public void setOrderPolicy(OrderPolicy policy) {
+ this.orderPolicy = policy;
+ }
+
+ @Override
+ public boolean isPrimary() {
+ return true;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ public void start() {};
+ public void stop() {};
+ public void destroy() {};
+ public void pause() {};
+ public void resume() {}
+
+ public boolean isParallel() {
+ return this.isParallel;
+ }
+
+ public boolean isBucketSorted() {
+ return this.isBucketSorted;
+ }
+
+ public void setBucketSorted(boolean isBucketSorted) {
+ this.isBucketSorted = isBucketSorted;
+ }
+ public boolean isHDFSQueue() {
+ return this.isHDFSQueue;
+ }
+
+ public void setIsHDFSQueue(boolean isHDFSQueue) {
+ this.isHDFSQueue = isHDFSQueue;
+ }
+}