You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/21 00:29:27 UTC
[5/6] incubator-geode git commit: classes for implementing UDA
functionality
classes for implementing UDA functionality
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/42fb6fc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/42fb6fc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/42fb6fc0
Branch: refs/heads/feature/GEODE-1269
Commit: 42fb6fc0cff310c976f7ab1e5621c59ad4aabf01
Parents: 8bc7afa
Author: Asif Shahid <as...@snappydata.io>
Authored: Thu Apr 14 22:12:25 2016 -0700
Committer: Asif Shahid <as...@snappydata.io>
Committed: Thu Apr 14 22:12:25 2016 -0700
----------------------------------------------------------------------
.../gemfire/cache/query/UDAExistsException.java | 43 ++
.../query/internal/CompiledUDAFunction.java | 67 +++
.../aggregate/uda/UDADistributionAdvisor.java | 187 +++++++++
.../internal/aggregate/uda/UDAManager.java | 35 ++
.../internal/aggregate/uda/UDAManagerImpl.java | 229 +++++++++++
.../internal/aggregate/uda/UDAMessage.java | 106 +++++
.../cache/xmlcache/UDAManagerCreation.java | 44 ++
.../cache/query/dunit/UDACreationDUnitTest.java | 403 +++++++++++++++++++
.../gemfire/cache/query/dunit/UDADUnitImpl.java | 122 ++++++
.../dunit/UDAPartitionedQueryDUnitTest.java | 102 +++++
.../functional/UDAPartitionedJUnitTest.java | 40 ++
.../functional/UDAReplicatedJUnitTest.java | 34 ++
.../cache/query/functional/UDATestImpl.java | 140 +++++++
.../query/functional/UDATestInterface.java | 22 +
14 files changed, 1574 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
new file mode 100644
index 0000000..e40a80a
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.query;
+
+/**
+ * This Exception is thrown if a UDA with a given name already exists in the system,
+ * and a new UDA is being registered with the same name
+ *
+ * @author ashahid
+ *
+ */
+public class UDAExistsException extends QueryException{
+
+ private static final long serialVersionUID = -1643528839352144L;
+
+ public UDAExistsException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs instance of UDAExistsException with error message and cause
+ * @param msg the error message
+ * @param cause a Throwable that is a cause of this exception
+ */
+ public UDAExistsException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
new file mode 100644
index 0000000..e00b909
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.query.internal;
+
+import com.gemstone.gemfire.cache.CacheRuntimeException;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Represents the UserDefinedAggregate function node
+ * @author ashahid
+ * @since 9.0
+ *
+ */
+public class CompiledUDAFunction extends CompiledAggregateFunction {
+ private final String udaName;
+
+ public CompiledUDAFunction(CompiledValue expr, int aggFunc, String name) {
+ super(expr, aggFunc);
+ this.udaName = name;
+ }
+
+ @Override
+ public Aggregator evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException,
+ QueryInvocationTargetException {
+ Class<Aggregator> aggregatorClass = ((GemFireCacheImpl) context.getCache()).getUDAManager().getUDAClass(this.udaName);
+ try {
+ return aggregatorClass.newInstance();
+ } catch (Exception e) {
+ throw new CacheRuntimeException(e) {
+ };
+ }
+
+ }
+
+ @Override
+ public ObjectType getObjectType() {
+ return new ObjectTypeImpl(Object.class);
+ }
+
+ @Override
+ protected String getStringRep() {
+ return this.udaName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
new file mode 100644
index 0000000..2d65cb7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Handles the exchange of UDAProfile with the peers.
+ * It is used for exchanging information about the UDAs existing in the system.
+ * @author ashahid
+ * @since 9.0
+ */
+public class UDADistributionAdvisor extends DistributionAdvisor {
+
+ private UDADistributionAdvisor(DistributionAdvisee sender) {
+ super(sender);
+
+ }
+
+ public static UDADistributionAdvisor createUDAAdvisor(DistributionAdvisee sender) {
+ UDADistributionAdvisor advisor = new UDADistributionAdvisor(sender);
+ advisor.initialize();
+ return advisor;
+ }
+
+ /** Instantiate new Sender profile for this member */
+ @Override
+ protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
+ return new UDAProfile(memberId, version);
+ }
+
+ @Override
+ public Set<InternalDistributedMember> adviseProfileExchange() {
+ InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();
+ GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+ DM dm = ids.getDistributionManager();
+ InternalDistributedMember elder = dm.getElderId();
+ Set<InternalDistributedMember> locators = dm.getAllHostedLocators().keySet();
+ if (elder == null || elder.equals(dm.getId()) || locators.contains(elder)) {
+ elder = null;
+ Set<InternalDistributedMember> allMembers = gfc.getDistributionAdvisor().adviseGeneric();
+ Iterator<InternalDistributedMember> iter = allMembers.iterator();
+ while(iter.hasNext()) {
+ InternalDistributedMember temp = iter.next();
+ if(!locators.contains(temp)) {
+ elder = temp;
+ break;
+ }
+ }
+ }
+ if (elder != null) {
+ return Collections.singleton(elder);
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public Set<InternalDistributedMember> adviseProfileUpdate() {
+ InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();
+ DM dm = ids.getDistributionManager();
+
+ Set<InternalDistributedMember> locators = dm.getAllHostedLocators().keySet();
+ GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+ Set<InternalDistributedMember> all = gfc.getDistributionAdvisor().adviseGeneric();
+ all.removeAll(locators);
+ return all;
+ }
+
+ /**
+ * Create or update a profile for a remote counterpart.
+ *
+ * @param profile
+ * the profile, referenced by this advisor after this method returns.
+ */
+ @Override
+ public boolean putProfile(Profile profile) {
+ UDAProfile udap = (UDAProfile) profile;
+ GemFireCacheImpl.getExisting().getUDAManager().addUDAs(udap.udas, udap.getCheckforLocalOnlyUDAs());
+ return true;
+ }
+
+ @Override
+ public boolean removeProfile(Profile profile, boolean destroyed) {
+ return true;
+ }
+
+ @Override
+ public boolean initializationGate() {
+ return false;
+ }
+
+ /**
+ * Profile information for a remote counterpart.
+ */
+ public static final class UDAProfile extends DistributionAdvisor.Profile {
+
+ private HashMap<String, String> udas = new HashMap<String, String>();
+ private boolean checkForLocalOnlyUDAs = false;
+ public UDAProfile(InternalDistributedMember memberId, int version) {
+ super(memberId, version);
+ }
+
+ public UDAProfile() {
+ }
+
+ void setCheckForLocalOnlyUDAsAsTrue() {
+ this.checkForLocalOnlyUDAs = true;
+ }
+
+ boolean getCheckforLocalOnlyUDAs() {
+ return this.checkForLocalOnlyUDAs;
+ }
+
+ void putInProfile(String udaName, String udaClass) {
+ this.udas.put(udaName, udaClass);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.udas = DataSerializer.readHashMap(in);
+ this.checkForLocalOnlyUDAs = DataSerializer.readPrimitiveBoolean(in);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeHashMap(this.udas, out);
+ DataSerializer.writePrimitiveBoolean(this.checkForLocalOnlyUDAs, out);
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+ @Override
+ public int getDSFID() {
+ return UDA_PROFILE;
+ }
+
+ @Override
+ public void processIncoming(DistributionManager dm, String adviseePath, boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) {
+ handleDistributionAdvisee(GemFireCacheImpl.getExisting().getUDAManager(), removeProfile, exchangeProfiles, replyProfiles);
+ }
+
+ @Override
+ public void collectProfile(DistributionManager dm, String adviseePath, final List<Profile> replyProfiles) {
+ UDAProfile udap = (UDAProfile)GemFireCacheImpl.getExisting().getUDAManager().getProfile();
+ udap.setCheckForLocalOnlyUDAsAsTrue();
+ replyProfiles.add(udap);
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
new file mode 100644
index 0000000..f6a462e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+
+/**
+ * Interface for creating & removing UDAs.
+ *
+ * @see UDAManagerImpl
+ * @author ashahid
+ * @since 9.0
+ */
+public interface UDAManager {
+ public Map<String, String> getUDAs() ;
+ public void createUDA(String name, String fqClass) throws UDAExistsException, NameResolutionException;
+ public void removeUDA(String name) ;
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
new file mode 100644
index 0000000..964922e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDADistributionAdvisor.UDAProfile;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class UDAManagerImpl implements DistributionAdvisee, UDAManager {
+
+ private ConcurrentMap<String, Class<Aggregator>> map;
+
+ private final DistributionAdvisor advisor;
+ private static final Logger logger = LogService.getLogger();
+ private static ThreadLocal<Map<String, Class<Aggregator>>> localOnlyUDAS = new ThreadLocal<Map<String, Class<Aggregator>>>();
+
+ public UDAManagerImpl() {
+ this.map = new ConcurrentHashMap<String, Class<Aggregator>>();
+ this.advisor = UDADistributionAdvisor.createUDAAdvisor(this);
+
+ }
+
+ public void createUDA(String name, String fqClass) throws UDAExistsException, NameResolutionException {
+ createUDALocally(name, fqClass);
+ new UDAMessage(name, fqClass).send();
+ }
+
+ public Map<String, String> getUDAs() {
+ Map<String, String> mapping = new HashMap<String, String>();
+ for (Map.Entry<String, Class<Aggregator>> entry : this.map.entrySet()) {
+ mapping.put(entry.getKey(), entry.getValue().getName());
+ }
+ return mapping;
+ }
+
+ public void createUDALocally(String name, String fqClass) throws NameResolutionException, UDAExistsException {
+ synchronized (this) {
+ if (!this.map.containsKey(name)) {
+ try {
+ Class<Aggregator> aggregatorClass = (Class<Aggregator>) Class.forName(fqClass);
+ this.map.put(name, aggregatorClass);
+ } catch (ClassNotFoundException cnfe) {
+ throw new NameResolutionException(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(fqClass, name), cnfe);
+ }
+ } else {
+ throw new UDAExistsException(LocalizedStrings.UDA_MANAGER_Uda_Exists.toLocalizedString(name));
+ }
+ }
+ }
+
+ public void collectUDAsFromRemote() {
+ int numTries = 5;
+ boolean collectedUDAs = false;
+ for (int i = 0; i < numTries; ++i) {
+ try {
+ new UpdateAttributesProcessor(this).collect(DataSerializableFixedID.UDA_PROFILE);
+ collectedUDAs = true;
+ break;
+ } catch (Exception e) {
+ this.getDistributionManager().getCancelCriterion().checkCancelInProgress(e);
+ }
+ }
+ if (!collectedUDAs) {
+ if (logger.isErrorEnabled()) {
+ logger.error(LocalizedStrings.UDA_MANAGER_Udas_Not_Collected.toLocalizedString());
+ }
+ }
+ }
+
+ void addUDAs(Map<String, String> udas, boolean checkforOnlyLocal) {
+ // Get those UDAs which are present only locally
+ final Map<String, Class<Aggregator>> onlyLocalUDAs = new HashMap<String, Class<Aggregator>>();
+ if (logger.isInfoEnabled()) {
+ logger.info("UDAManagerImpl::addUDAs: adding remote collected UDAs=" + udas + " check for local only =" + checkforOnlyLocal);
+ }
+ synchronized (this) {
+ if (checkforOnlyLocal) {
+ for (Map.Entry<String, Class<Aggregator>> entry : this.map.entrySet()) {
+ if (!udas.containsKey(entry.getKey())) {
+ onlyLocalUDAs.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ for (Map.Entry<String, String> entry : udas.entrySet()) {
+ String udaName = entry.getKey();
+ String udaClass = entry.getValue();
+ Class<Aggregator> existingUDAClass = this.map.get(udaName);
+ if (existingUDAClass == null) {
+ try {
+ Class<Aggregator> aggregatorClass = (Class<Aggregator>) Class.forName(udaClass);
+ this.map.put(udaName, aggregatorClass);
+ } catch (ClassNotFoundException cnfe) {
+ logger.error(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(udaClass, udaName));
+ }
+ } else {
+ // check if the classes are same
+ if (!udaClass.equals(existingUDAClass.getName())) {
+ logger.error(LocalizedStrings.UDA_MANAGER_Class_Conflict.toLocalizedString(udaName, existingUDAClass.getName(), udaClass));
+ }
+ }
+ }
+ }
+
+ if (checkforOnlyLocal && !onlyLocalUDAs.isEmpty()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("UDAManagerImpl::addUDAs:Only local UDAs=" + onlyLocalUDAs);
+ }
+ localOnlyUDAS.set(onlyLocalUDAs);
+ new UpdateAttributesProcessor(UDAManagerImpl.this).sendProfileUpdate(false);
+ }
+
+ }
+
+ public Class<Aggregator> getUDAClass(String name) throws NameResolutionException {
+ Class<Aggregator> aggClass = this.map.get(name);
+ if (aggClass == null) {
+ throw new NameResolutionException(LocalizedStrings.UDA_MANAGER_Aggregator_Not_Found.toLocalizedString(name));
+ }
+ return aggClass;
+ }
+
+ public synchronized void clear() {
+ this.map.clear();
+ }
+
+ @Override
+ public void removeUDA(String udaName) {
+ this.removeUDALocally(udaName);
+ new UDAMessage(udaName).send();
+ }
+
+ void removeUDALocally(String udaName) {
+ synchronized (this) {
+ this.map.remove(udaName);
+ }
+ }
+
+ @Override
+ public DM getDistributionManager() {
+ return getSystem().getDistributionManager();
+ }
+
+ @Override
+ public CancelCriterion getCancelCriterion() {
+ return null;
+ }
+
+ @Override
+ public DistributionAdvisor getDistributionAdvisor() {
+
+ return this.advisor;
+ }
+
+ @Override
+ public Profile getProfile() {
+ return this.advisor.createProfile();
+ }
+
+ @Override
+ public DistributionAdvisee getParentAdvisee() {
+ return null;
+ }
+
+ @Override
+ public InternalDistributedSystem getSystem() {
+ return InternalDistributedSystem.getConnectedInstance();
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public String getFullPath() {
+ return null;
+ }
+
+ @Override
+ public void fillInProfile(Profile profile) {
+ UDAProfile udap = (UDAProfile) profile;
+ synchronized (this) {
+ Map<String, Class<Aggregator>> localOnly = localOnlyUDAS.get();
+ localOnlyUDAS.set(null);
+ Map<String, Class<Aggregator>> toSend = localOnly != null ? localOnly : this.map;
+ for (Map.Entry<String, Class<Aggregator>> entry : toSend.entrySet()) {
+ udap.putInProfile(entry.getKey(), entry.getValue().getName());
+ }
+ }
+
+ }
+
+ @Override
+ public int getSerialNumber() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
new file mode 100644
index 0000000..706499d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Used for sending UDA creation / removal message
+ * @author ashahid
+ * @since 9.0
+ */
+public class UDAMessage extends DistributionMessage {
+ private boolean isCreate;
+ private String udaName;
+ private String udaClass;
+
+ public UDAMessage() {}
+
+ public UDAMessage(String name, String udaClass) {
+ this.isCreate = true;
+ this.udaName = name;
+ this.udaClass = udaClass;
+ }
+
+ public UDAMessage(String name) {
+ this.isCreate = false;
+ this.udaName = name;
+ }
+
+ @Override
+ public int getDSFID() {
+ return UDA_MESSAGE;
+ }
+
+ @Override
+ public int getProcessorType() {
+ return DistributionManager.SERIAL_EXECUTOR;
+ }
+
+ @Override
+ protected void process(DistributionManager dm) {
+ if (this.isCreate) {
+ try {
+ GemFireCacheImpl.getExisting().getUDAManager().createUDALocally(this.udaName, this.udaClass);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } else {
+ GemFireCacheImpl.getExisting().getUDAManager().removeUDALocally(this.udaName);
+ }
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.isCreate = DataSerializer.readPrimitiveBoolean(in);
+ this.udaName = DataSerializer.readString(in);
+ if (this.isCreate) {
+ this.udaClass = DataSerializer.readString(in);
+ }
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writePrimitiveBoolean(this.isCreate, out);
+ DataSerializer.writeString(this.udaName, out);
+ if (this.isCreate) {
+ DataSerializer.writeString(this.udaClass, out);
+ }
+ }
+
+ public void send() {
+ GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+ DistributionAdvisor advisor = gfc.getDistributionAdvisor();
+ final Set<InternalDistributedMember> recipients = new HashSet<InternalDistributedMember>(advisor.adviseGeneric());
+ recipients.remove(gfc.getDistributionManager().getId());
+ this.setRecipients(recipients);
+ gfc.getDistributionManager().putOutgoing(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
new file mode 100644
index 0000000..60d862c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.xmlcache;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+
+public class UDAManagerCreation implements UDAManager {
+ private Map<String, String> map = new HashMap<String, String>();
+
+ @Override
+ public Map<String, String> getUDAs() {
+ return Collections.unmodifiableMap(this.map);
+ }
+
+ @Override
+ public void createUDA(String name, String fqClass) {
+ this.map.put(name, fqClass);
+ }
+
+ @Override
+ public void removeUDA(String name) {
+ this.map.remove(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
new file mode 100644
index 0000000..2abcce2
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl.SumUDA;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass1;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass2;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass3;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
+import com.gemstone.gemfire.internal.cache.xmlcache.ClientCacheCreation;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+@Category(DistributedTest.class)
+public class UDACreationDUnitTest extends CacheTestCase {
+ public UDACreationDUnitTest(String name) {
+ super(name);
+ }
+
+ @Test
+ public void testUDACreationThroughProfileExchange() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = getCache();
+ final String udaName = "uda1";
+ QueryService qs = CacheUtils.getQueryService();
+ qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+ createCache(vm1, vm2, vm3);
+ validateUDAExists(udaName, vm1, vm2, vm3);
+ this.closeCache(vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testUDACreationThroughMessage() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = getCache();
+ createCache(vm1, vm2, vm3);
+ final String udaName = "uda1";
+ QueryService qs = CacheUtils.getQueryService();
+ qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+ validateUDAExists(udaName, vm1, vm2, vm3);
+ this.closeCache(vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testUDARemovalThroughMessage() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = getCache();
+ createCache(vm1, vm2, vm3);
+ final String udaName = "uda1";
+ QueryService qs = CacheUtils.getQueryService();
+ qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+ validateUDAExists(udaName, vm1, vm2, vm3);
+ qs.removeUDA(udaName);
+ validateUDADoesNotExists(udaName, vm1, vm2, vm3);
+ this.closeCache(vm1, vm2, vm3);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testUDAProfileMerge() {
+ Host host = Host.getHost(0);
+ final VM vm1 = host.getVM(1);
+
+ final CacheCreation cacheCreation1 = new CacheCreation();
+ cacheCreation1.addUDA("uda1", UDACLass1.class.getName());
+ Helper helper = new Helper();
+ helper.createCacheThruXML(cacheCreation1);
+ final Cache c = getCache();
+ assertNotNull(c);
+ UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+ Map<String, String> map = udaMgr.getUDAs();
+ assertEquals(map.get("uda1"), UDACLass1.class.getName());
+
+ // Now in VM1 create another cache through XMl containing uda2
+ vm1.invoke(new SerializableRunnable("Create Cache in other VM") {
+ public void run() {
+
+ final CacheCreation cacheCreation2 = new CacheCreation();
+ cacheCreation2.addUDA("uda2", UDACLass2.class.getName());
+ cacheCreation2.addUDA("uda3", UDACLass3.class.getName());
+ Helper helper = new Helper();
+ helper.createCacheThruXML(cacheCreation2);
+ final Cache c = getCache();
+
+ }
+ });
+ // This VM should also have 3 UDAs at the end of intialization of remote vm
+
+ map = udaMgr.getUDAs();
+ assertEquals(map.get("uda1"), UDACLass1.class.getName());
+ assertEquals(map.get("uda2"), UDACLass2.class.getName());
+ assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+ vm1.invoke(new SerializableRunnable("Create Cache in other VM") {
+ public void run() {
+ // validate at the end of intialization, there exists 3 UDAs
+ UDAManager udaMgr = ((GemFireCacheImpl) getCache()).getUDAManager();
+ Map<String, String> map = udaMgr.getUDAs();
+ assertEquals(map.get("uda1"), UDACLass1.class.getName());
+ assertEquals(map.get("uda2"), UDACLass2.class.getName());
+ assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+ }
+ });
+
+ }
+
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testUDAProfileMergeMultipleVMs() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ AsyncInvocation one = this.createCacheWithUDAAsynchronously("uda1", UDACLass1.class.getName(), vm1);
+ AsyncInvocation two = this.createCacheWithUDAAsynchronously("uda2", UDACLass2.class.getName(), vm2);
+ AsyncInvocation three = this.createCacheWithUDAAsynchronously("uda3", UDACLass3.class.getName(), vm3);
+
+ final Cache c = getCache();
+ assertNotNull(c);
+ one.join();
+ two.join();
+ three.join();
+ UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+ Map<String, String> map = udaMgr.getUDAs();
+ assertEquals(map.get("uda1"), UDACLass1.class.getName());
+ assertEquals(map.get("uda2"), UDACLass2.class.getName());
+ assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+ validateUDAExists("uda1", vm1, vm2, vm3);
+ validateUDAExists("uda2", vm1, vm2, vm3);
+ validateUDAExists("uda3", vm1, vm2, vm3);
+
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ @Override
+ public final void postTearDownCacheTestCase() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ private void createCache(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(new SerializableRunnable("create cache") {
+ public void run() {
+ Cache cache = getCache();
+ }
+ });
+ }
+ }
+
+ private void validateUDAExists(final String udaName, VM... vms) {
+ try {
+ Class<Aggregator> udaClass = ((GemFireCacheImpl) this.getCache()).getUDAManager().getUDAClass(udaName);
+ assertNotNull(udaClass);
+ } catch (NameResolutionException nre) {
+ throw new RuntimeException(nre);
+ }
+ for (VM vm : vms) {
+ vm.invoke(new SerializableRunnable("validate UDA exists") {
+ public void run() {
+ try {
+ Class<Aggregator> udaClass = ((GemFireCacheImpl) getCache()).getUDAManager().getUDAClass(udaName);
+ assertNotNull(udaClass);
+ } catch (NameResolutionException nre) {
+ throw new RuntimeException(nre);
+ }
+
+ }
+ });
+ }
+ }
+
+ private AsyncInvocation createCacheWithUDAAsynchronously(final String udaName,
+ final String udaClass, VM vm) {
+ // Now in VM1 create another cache through XMl containing uda2
+ return vm.invokeAsync(new SerializableRunnable("Create Cache in other VM") {
+ public void run() {
+ final CacheCreation cacheCreation2 = new CacheCreation();
+ cacheCreation2.addUDA(udaName, udaClass);
+ Helper helper = new Helper();
+ helper.createCacheThruXML(cacheCreation2);
+ final Cache c = getCache();
+
+ }
+ });
+ }
+
+ private void validateUDADoesNotExists(final String udaName, VM... vms) {
+ try {
+ Class<Aggregator> udaClass = ((GemFireCacheImpl) this.getCache()).getUDAManager().getUDAClass(udaName);
+ fail("UDA should not exist");
+ } catch (NameResolutionException nre) {
+ // OK
+ }
+ for (VM vm : vms) {
+ vm.invoke(new SerializableRunnable("validate UDA exists") {
+ public void run() {
+ try {
+ Class<Aggregator> udaClass = ((GemFireCacheImpl) getCache()).getUDAManager().getUDAClass(udaName);
+ fail("UDA should not exist");
+ } catch (NameResolutionException nre) {
+ // OK
+ }
+
+ }
+ });
+ }
+ }
+
+ private void closeCache(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ getCache().close();
+ }
+ });
+ }
+ }
+
+ protected String getGemFireVersion() {
+ return CacheXml.VERSION_1_0;
+ }
+
+ public static class SumUDA implements Aggregator {
+
+ @Override
+ public void accumulate(Object value) {
+
+ }
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public Object terminate() {
+ return null;
+ }
+
+ @Override
+ public void merge(Aggregator otherAggregator) {
+
+ }
+
+ }
+
+ public class UDACLass1 implements Aggregator {
+
+ @Override
+ public void accumulate(Object value) {
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public Object terminate() {
+ return null;
+ }
+
+ @Override
+ public void merge(Aggregator otherAggregator) {
+ }
+
+ }
+
+ public class UDACLass2 implements Aggregator {
+
+ @Override
+ public void accumulate(Object value) {
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public Object terminate() {
+ return null;
+ }
+
+ @Override
+ public void merge(Aggregator otherAggregator) {
+ }
+
+ }
+
+ public class UDACLass3 implements Aggregator {
+
+ @Override
+ public void accumulate(Object value) {
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public Object terminate() {
+ return null;
+ }
+
+ @Override
+ public void merge(Aggregator otherAggregator) {
+ }
+
+ }
+
+ public static class Helper extends CacheXmlTestCase {
+ public Helper() {
+ super("UDACreationDUnitTest:testUDAProfileMerge");
+ CacheXmlTestCase.lonerDistributedSystem = false;
+ }
+
+ public void createCacheThruXML(CacheCreation creation) {
+ this.testXml(creation, true);
+ }
+
+ @Override
+ protected String getGemFireVersion() {
+ return CacheXml.VERSION_1_0;
+ }
+
+ @Override
+ protected boolean getUseSchema() {
+ return true;
+ }
+
+ @Test
+ public void testDummy(){}
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
new file mode 100644
index 0000000..7f80657
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.query.Index;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.IndexType;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestInterface;
+import com.gemstone.gemfire.cache.query.functional.NonDistinctOrderByTestImplementation;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ *
+ * @author ashahid
+ *
+ */
+public abstract class UDADUnitImpl extends CacheTestCase implements UDATestInterface{
+
+
+ public UDADUnitImpl(String name) {
+ super(name);
+ }
+
+ protected abstract UDATestInterface createTestInstance();
+
+ @Test
+ @Override
+ public void testUDANoGroupBy() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ UDATestInterface test = createTestInstance();
+ test.testUDANoGroupBy();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ @Override
+ public void testUDAWithGroupBy() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ UDATestInterface test = createTestInstance();
+ test.testUDAWithGroupBy();;
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+
+
+ protected void createIndex(VM vm, final String indexName,
+ final String indexedExpression, final String regionPath) {
+ vm.invoke(new SerializableRunnable("create index") {
+ public void run() {
+ try {
+ Cache cache = getCache();
+ cache.getQueryService().createIndex(indexName, indexedExpression,
+ regionPath);
+ } catch (RegionNotFoundException e) {
+ fail(e.toString());
+ } catch (IndexExistsException e) {
+ fail(e.toString());
+ } catch (IndexNameConflictException e) {
+ fail(e.toString());
+ }
+ }
+ });
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ @Override
+ public final void postTearDownCacheTestCase() throws Exception {
+ disconnectAllFromDS();
+ }
+
+
+ private void closeCache(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ getCache().close();
+ }
+ });
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
new file mode 100644
index 0000000..ba0ce9a
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+@Category(DistributedTest.class)
+public class UDAPartitionedQueryDUnitTest extends UDADUnitImpl {
+
+
+ public UDAPartitionedQueryDUnitTest(String name) {
+ super(name);
+ }
+
+ @Override
+ protected UDATestInterface createTestInstance() {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+
+ UDATestImpl test = new UDATestImpl() {
+
+ @Override
+ public Region createRegion(String regionName, Class valueConstraint) {
+ // TODO Auto-generated method stub
+ Region rgn = createAccessor(regionName, valueConstraint);
+ createPR(vm1, regionName, valueConstraint);
+ createPR(vm2, regionName, valueConstraint);
+ createPR(vm3, regionName, valueConstraint);
+ return rgn;
+ }
+ };
+ return test;
+ }
+
+ private void createBuckets(VM vm) {
+ vm.invoke(new SerializableRunnable("create accessor") {
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region");
+ for (int i = 0; i < 10; i++) {
+ region.put(i, i);
+ }
+ }
+ });
+ }
+
+ private void createPR(VM vm, final String regionName,
+ final Class valueConstraint) {
+ vm.invoke(new SerializableRunnable("create data store") {
+ public void run() {
+ Cache cache = getCache();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(10);
+ cache.createRegionFactory(RegionShortcut.PARTITION)
+ .setValueConstraint(valueConstraint)
+ .setPartitionAttributes(paf.create()).create(regionName);
+ }
+ });
+ }
+
+ private Region createAccessor(String regionName, Class valueConstraint) {
+
+ Cache cache = getCache();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(10);
+ paf.setLocalMaxMemory(0);
+ return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+ .setValueConstraint(valueConstraint)
+ .setPartitionAttributes(paf.create()).create(regionName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
new file mode 100644
index 0000000..1f5b3e7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class UDAPartitionedJUnitTest extends UDATestImpl {
+
+ @Override
+ public Region createRegion(String regionName, Class valueConstraint) {
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ AttributesFactory af = new AttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ af.setValueConstraint(valueConstraint);
+ Region r1 = CacheUtils.createRegion(regionName, af.create(), false);
+ return r1;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
new file mode 100644
index 0000000..9db76ab
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class UDAReplicatedJUnitTest extends UDATestImpl {
+
+ @Override
+ public Region createRegion(String regionName, Class valueConstraint) {
+ Region r1 = CacheUtils.createRegion(regionName, valueConstraint);
+ return r1;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
new file mode 100644
index 0000000..7b59243
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+
+/**
+ * Tests the group by queries with or without aggreagte functions
+ *
+ * @author Asif
+ *
+ *
+ */
+public abstract class UDATestImpl implements UDATestInterface {
+
+
+ public abstract Region createRegion(String regionName, Class constraint);
+
+ @Test
+ public void testUDANoGroupBy() throws Exception {
+ Region region = this.createRegion("portfolio", Portfolio.class);
+ int sum = 0;
+ for (int i = 1; i < 200; ++i) {
+ Portfolio pf = new Portfolio(i);
+ pf.shortID = (short) ((short) i / 5);
+ region.put("" + i, pf);
+ sum += pf.ID;
+ }
+ String queryStr = "select myUDA(p.ID) from /portfolio p where p.ID > 0 ";
+ QueryService qs = CacheUtils.getQueryService();
+ qs.createUDA("myUDA", "com.gemstone.gemfire.cache.query.functional.UDATestImpl$SumUDA");
+ Query query = qs.newQuery(queryStr);
+ SelectResults sr = (SelectResults) query.execute();
+ assertEquals(sum, ((Integer)sr.asList().get(0)).intValue());
+ }
+
+ @Test
+ public void testUDAWithGroupBy() throws Exception {
+ Region region = this.createRegion("portfolio", Portfolio.class);
+ int sumActive = 0;
+ int sumInactive = 0;
+ for (int i = 1; i < 200; ++i) {
+ Portfolio pf = new Portfolio(i);
+ pf.shortID = (short) ((short) i / 5);
+ region.put("" + i, pf);
+ if(pf.status.equals("active")) {
+ sumActive += pf.ID;
+ }else {
+ sumInactive += pf.ID;
+ }
+
+ }
+ String queryStr = "select p.status , myUDA(p.ID) from /portfolio p where p.ID > 0 group by p.status order by p.status";
+ QueryService qs = CacheUtils.getQueryService();
+ qs.createUDA("myUDA", "com.gemstone.gemfire.cache.query.functional.UDATestImpl$SumUDA");
+ Query query = qs.newQuery(queryStr);
+ SelectResults sr = (SelectResults) query.execute();
+ List<Struct> structs = (List<Struct>)sr.asList();
+ assertEquals(2, structs.size());
+ assertTrue(structs.get(0).getFieldValues()[0].equals("active"));
+ assertEquals(sumActive, ((Integer)structs.get(0).getFieldValues()[1]).intValue());
+
+ assertTrue(structs.get(1).getFieldValues()[0].equals("inactive"));
+ assertEquals(sumInactive, ((Integer)structs.get(1).getFieldValues()[1]).intValue());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ CacheUtils.startCache();
+ CacheUtils.getCache();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ CacheUtils.closeCache();
+ }
+
+ public static class SumUDA implements Aggregator, Serializable {
+
+ private int sum =0;
+
+ public SumUDA() {
+
+ }
+ @Override
+ public void accumulate(Object value) {
+ sum += ((Integer)value).intValue();
+
+ }
+
+ @Override
+ public void init() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Object terminate() {
+ return Integer.valueOf(sum);
+ }
+
+ @Override
+ public void merge(Aggregator otherAggregator) {
+ SumUDA uda = (SumUDA)otherAggregator;
+ this.sum += uda.sum;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
new file mode 100644
index 0000000..f7c9e40
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+public interface UDATestInterface {
+ public void testUDANoGroupBy() throws Exception ;
+ public void testUDAWithGroupBy() throws Exception ;
+}