You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/21 00:29:27 UTC

[5/6] incubator-geode git commit: classes for implementing UDA functionality

classes for implementing UDA functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/42fb6fc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/42fb6fc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/42fb6fc0

Branch: refs/heads/feature/GEODE-1269
Commit: 42fb6fc0cff310c976f7ab1e5621c59ad4aabf01
Parents: 8bc7afa
Author: Asif Shahid <as...@snappydata.io>
Authored: Thu Apr 14 22:12:25 2016 -0700
Committer: Asif Shahid <as...@snappydata.io>
Committed: Thu Apr 14 22:12:25 2016 -0700

----------------------------------------------------------------------
 .../gemfire/cache/query/UDAExistsException.java |  43 ++
 .../query/internal/CompiledUDAFunction.java     |  67 +++
 .../aggregate/uda/UDADistributionAdvisor.java   | 187 +++++++++
 .../internal/aggregate/uda/UDAManager.java      |  35 ++
 .../internal/aggregate/uda/UDAManagerImpl.java  | 229 +++++++++++
 .../internal/aggregate/uda/UDAMessage.java      | 106 +++++
 .../cache/xmlcache/UDAManagerCreation.java      |  44 ++
 .../cache/query/dunit/UDACreationDUnitTest.java | 403 +++++++++++++++++++
 .../gemfire/cache/query/dunit/UDADUnitImpl.java | 122 ++++++
 .../dunit/UDAPartitionedQueryDUnitTest.java     | 102 +++++
 .../functional/UDAPartitionedJUnitTest.java     |  40 ++
 .../functional/UDAReplicatedJUnitTest.java      |  34 ++
 .../cache/query/functional/UDATestImpl.java     | 140 +++++++
 .../query/functional/UDATestInterface.java      |  22 +
 14 files changed, 1574 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
new file mode 100644
index 0000000..e40a80a
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.query;
+
+/**
+ * This Exception is thrown if a UDA with a given name already exists in the system,
+ * and a new UDA is being registered with the same name
+ * 
+ * @author ashahid
+ *
+ */
+public class UDAExistsException extends QueryException{
+
+  private static final long serialVersionUID = -1643528839352144L;  
+  
+  public UDAExistsException(String msg) {
+    super(msg);
+  }
+  
+  /**
+   * Constructs instance of UDAExistsException with error message and cause
+   * @param msg the error message
+   * @param cause a Throwable that is a cause of this exception
+   */
+  public UDAExistsException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
new file mode 100644
index 0000000..e00b909
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.query.internal;
+
+import com.gemstone.gemfire.cache.CacheRuntimeException;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Represents the UserDefinedAggregate function node
+ * @author ashahid
+ * @since 9.0
+ *
+ */
+public class CompiledUDAFunction extends CompiledAggregateFunction {
+  private final String udaName;
+
+  public CompiledUDAFunction(CompiledValue expr, int aggFunc, String name) {
+    super(expr, aggFunc);
+    this.udaName = name;
+  }
+
+  @Override
+  public Aggregator evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException,
+                                                      QueryInvocationTargetException {
+    Class<Aggregator> aggregatorClass = ((GemFireCacheImpl) context.getCache()).getUDAManager().getUDAClass(this.udaName);
+    try {
+      return aggregatorClass.newInstance();
+    } catch (Exception e) {
+      throw new CacheRuntimeException(e) {
+      };
+    }
+
+  }
+
+  @Override
+  public ObjectType getObjectType() {
+    return new ObjectTypeImpl(Object.class);
+  }
+
+  @Override
+  protected String getStringRep() {
+    return this.udaName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
new file mode 100644
index 0000000..2d65cb7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Handles the exchange of UDAProfile with the peers. 
+ * It is used for exchanging information about the UDAs existing in the system.
+ * @author ashahid
+ * @since 9.0
+ */
+public class UDADistributionAdvisor extends DistributionAdvisor {
+
+  private UDADistributionAdvisor(DistributionAdvisee sender) {
+    super(sender);
+
+  }
+
+  public static UDADistributionAdvisor createUDAAdvisor(DistributionAdvisee sender) {
+    UDADistributionAdvisor advisor = new UDADistributionAdvisor(sender);
+    advisor.initialize();
+    return advisor;
+  }
+
+  /** Instantiate new Sender profile for this member */
+  @Override
+  protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
+    return new UDAProfile(memberId, version);
+  }
+
+  @Override
+  public Set<InternalDistributedMember> adviseProfileExchange() {
+    InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();
+    GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+    DM dm = ids.getDistributionManager();
+    InternalDistributedMember elder = dm.getElderId();
+    Set<InternalDistributedMember> locators = dm.getAllHostedLocators().keySet();
+    if (elder == null || elder.equals(dm.getId()) || locators.contains(elder)) {
+      elder = null;
+      Set<InternalDistributedMember> allMembers = gfc.getDistributionAdvisor().adviseGeneric();
+      Iterator<InternalDistributedMember> iter = allMembers.iterator();
+      while(iter.hasNext()) {
+        InternalDistributedMember temp = iter.next();
+        if(!locators.contains(temp)) {
+          elder = temp;
+          break;
+        }
+      }
+    }
+    if (elder != null) {
+      return Collections.singleton(elder);
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public Set<InternalDistributedMember> adviseProfileUpdate() {
+    InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();   
+    DM dm = ids.getDistributionManager();
+    
+    Set<InternalDistributedMember> locators = dm.getAllHostedLocators().keySet();
+    GemFireCacheImpl gfc = GemFireCacheImpl.getExisting(); 
+    Set<InternalDistributedMember> all = gfc.getDistributionAdvisor().adviseGeneric();
+    all.removeAll(locators);
+    return all;
+  }
+
+  /**
+   * Create or update a profile for a remote counterpart.
+   * 
+   * @param profile
+   *          the profile, referenced by this advisor after this method returns.
+   */
+  @Override
+  public boolean putProfile(Profile profile) {
+    UDAProfile udap = (UDAProfile) profile;
+    GemFireCacheImpl.getExisting().getUDAManager().addUDAs(udap.udas, udap.getCheckforLocalOnlyUDAs());
+    return true;
+  }
+
+  @Override
+  public boolean removeProfile(Profile profile, boolean destroyed) {
+    return true;
+  }
+  
+  @Override
+  public boolean initializationGate() {    
+    return false;
+  }
+
+  /**
+   * Profile information for a remote counterpart.
+   */
+  public static final class UDAProfile extends DistributionAdvisor.Profile {
+
+    private HashMap<String, String> udas = new HashMap<String, String>();
+    private boolean checkForLocalOnlyUDAs = false;
+    public UDAProfile(InternalDistributedMember memberId, int version) {
+      super(memberId, version);
+    }
+
+    public UDAProfile() {
+    }
+    
+    void setCheckForLocalOnlyUDAsAsTrue() {
+      this.checkForLocalOnlyUDAs = true;
+    }
+    
+    boolean getCheckforLocalOnlyUDAs() {
+      return this.checkForLocalOnlyUDAs;
+    }
+    
+    void putInProfile(String udaName, String udaClass) {
+      this.udas.put(udaName, udaClass);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      this.udas = DataSerializer.readHashMap(in);
+      this.checkForLocalOnlyUDAs = DataSerializer.readPrimitiveBoolean(in);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeHashMap(this.udas, out);
+      DataSerializer.writePrimitiveBoolean(this.checkForLocalOnlyUDAs, out);
+    }
+
+    @Override
+    public Version[] getSerializationVersions() {
+      return null;
+    }
+
+    @Override
+    public int getDSFID() {
+      return UDA_PROFILE;
+    }
+
+    @Override
+    public void processIncoming(DistributionManager dm, String adviseePath, boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) {
+      handleDistributionAdvisee(GemFireCacheImpl.getExisting().getUDAManager(), removeProfile, exchangeProfiles, replyProfiles);
+    }
+
+    @Override
+    public void collectProfile(DistributionManager dm, String adviseePath, final List<Profile> replyProfiles) {
+      UDAProfile udap = (UDAProfile)GemFireCacheImpl.getExisting().getUDAManager().getProfile();
+      udap.setCheckForLocalOnlyUDAsAsTrue();
+      replyProfiles.add(udap);
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
new file mode 100644
index 0000000..f6a462e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+
+/**
+ * Interface for creating & removing UDAs.
+ * 
+ * @see UDAManagerImpl
+ * @author ashahid
+ * @since 9.0
+ */
+public interface UDAManager {
+  public Map<String, String> getUDAs() ;
+  public void createUDA(String name, String fqClass) throws UDAExistsException, NameResolutionException;
+  public void removeUDA(String name) ;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
new file mode 100644
index 0000000..964922e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDADistributionAdvisor.UDAProfile;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class UDAManagerImpl implements DistributionAdvisee, UDAManager {
+
+  private ConcurrentMap<String, Class<Aggregator>> map;
+
+  private final DistributionAdvisor advisor;
+  private static final Logger logger = LogService.getLogger();
+  private static ThreadLocal<Map<String, Class<Aggregator>>> localOnlyUDAS = new ThreadLocal<Map<String, Class<Aggregator>>>();
+
+  public UDAManagerImpl() {
+    this.map = new ConcurrentHashMap<String, Class<Aggregator>>();
+    this.advisor = UDADistributionAdvisor.createUDAAdvisor(this);
+
+  }
+
+  public void createUDA(String name, String fqClass) throws UDAExistsException, NameResolutionException {
+    createUDALocally(name, fqClass);
+    new UDAMessage(name, fqClass).send();
+  }
+
+  public Map<String, String> getUDAs() {
+    Map<String, String> mapping = new HashMap<String, String>();
+    for (Map.Entry<String, Class<Aggregator>> entry : this.map.entrySet()) {
+      mapping.put(entry.getKey(), entry.getValue().getName());
+    }
+    return mapping;
+  }
+
+  public void createUDALocally(String name, String fqClass) throws NameResolutionException, UDAExistsException {
+    synchronized (this) {
+      if (!this.map.containsKey(name)) {
+        try {
+          Class<Aggregator> aggregatorClass = (Class<Aggregator>) Class.forName(fqClass);
+          this.map.put(name, aggregatorClass);
+        } catch (ClassNotFoundException cnfe) {
+          throw new NameResolutionException(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(fqClass, name), cnfe);
+        }
+      } else {
+        throw new UDAExistsException(LocalizedStrings.UDA_MANAGER_Uda_Exists.toLocalizedString(name));
+      }
+    }
+  }
+
+  public void collectUDAsFromRemote() {
+    int numTries = 5;
+    boolean collectedUDAs = false;
+    for (int i = 0; i < numTries; ++i) {
+      try {
+        new UpdateAttributesProcessor(this).collect(DataSerializableFixedID.UDA_PROFILE);
+        collectedUDAs = true;
+        break;
+      } catch (Exception e) {
+        this.getDistributionManager().getCancelCriterion().checkCancelInProgress(e);
+      }
+    }
+    if (!collectedUDAs) {
+      if (logger.isErrorEnabled()) {
+        logger.error(LocalizedStrings.UDA_MANAGER_Udas_Not_Collected.toLocalizedString());
+      }
+    }
+  }
+
+  void addUDAs(Map<String, String> udas, boolean checkforOnlyLocal) {
+    // Get those UDAs which are present only locally
+    final Map<String, Class<Aggregator>> onlyLocalUDAs = new HashMap<String, Class<Aggregator>>();
+    if (logger.isInfoEnabled()) {
+      logger.info("UDAManagerImpl::addUDAs: adding remote collected UDAs=" + udas + " check for local only =" + checkforOnlyLocal);
+    }
+    synchronized (this) {
+      if (checkforOnlyLocal) {
+        for (Map.Entry<String, Class<Aggregator>> entry : this.map.entrySet()) {
+          if (!udas.containsKey(entry.getKey())) {
+            onlyLocalUDAs.put(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+      for (Map.Entry<String, String> entry : udas.entrySet()) {
+        String udaName = entry.getKey();
+        String udaClass = entry.getValue();
+        Class<Aggregator> existingUDAClass = this.map.get(udaName);
+        if (existingUDAClass == null) {
+          try {
+            Class<Aggregator> aggregatorClass = (Class<Aggregator>) Class.forName(udaClass);
+            this.map.put(udaName, aggregatorClass);
+          } catch (ClassNotFoundException cnfe) {
+            logger.error(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(udaClass, udaName));
+          }
+        } else {
+          // check if the classes are same
+          if (!udaClass.equals(existingUDAClass.getName())) {
+            logger.error(LocalizedStrings.UDA_MANAGER_Class_Conflict.toLocalizedString(udaName, existingUDAClass.getName(), udaClass));
+          }
+        }
+      }
+    }
+
+    if (checkforOnlyLocal && !onlyLocalUDAs.isEmpty()) {
+      if (logger.isInfoEnabled()) {
+        logger.info("UDAManagerImpl::addUDAs:Only local UDAs=" + onlyLocalUDAs);
+      }
+      localOnlyUDAS.set(onlyLocalUDAs);
+      new UpdateAttributesProcessor(UDAManagerImpl.this).sendProfileUpdate(false);
+    }
+
+  }
+
+  public Class<Aggregator> getUDAClass(String name) throws NameResolutionException {
+    Class<Aggregator> aggClass = this.map.get(name);
+    if (aggClass == null) {
+      throw new NameResolutionException(LocalizedStrings.UDA_MANAGER_Aggregator_Not_Found.toLocalizedString(name));
+    }
+    return aggClass;
+  }
+
+  public synchronized void clear() {
+    this.map.clear();
+  }
+
+  @Override
+  public void removeUDA(String udaName) {
+    this.removeUDALocally(udaName);
+    new UDAMessage(udaName).send();
+  }
+
+  void removeUDALocally(String udaName) {
+    synchronized (this) {
+      this.map.remove(udaName);
+    }
+  }
+
+  @Override
+  public DM getDistributionManager() {
+    return getSystem().getDistributionManager();
+  }
+
+  @Override
+  public CancelCriterion getCancelCriterion() {
+    return null;
+  }
+
+  @Override
+  public DistributionAdvisor getDistributionAdvisor() {
+
+    return this.advisor;
+  }
+
+  @Override
+  public Profile getProfile() {
+    return this.advisor.createProfile();
+  }
+
+  @Override
+  public DistributionAdvisee getParentAdvisee() {
+    return null;
+  }
+
+  @Override
+  public InternalDistributedSystem getSystem() {
+    return InternalDistributedSystem.getConnectedInstance();
+  }
+
+  @Override
+  public String getName() {
+    return null;
+  }
+
+  @Override
+  public String getFullPath() {
+    return null;
+  }
+
+  @Override
+  public void fillInProfile(Profile profile) {
+    UDAProfile udap = (UDAProfile) profile;
+    synchronized (this) {
+      Map<String, Class<Aggregator>> localOnly = localOnlyUDAS.get();
+      localOnlyUDAS.set(null);
+      Map<String, Class<Aggregator>> toSend = localOnly != null ? localOnly : this.map;
+      for (Map.Entry<String, Class<Aggregator>> entry : toSend.entrySet()) {
+        udap.putInProfile(entry.getKey(), entry.getValue().getName());
+      }
+    }
+
+  }
+
+  @Override
+  public int getSerialNumber() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
new file mode 100644
index 0000000..706499d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Used for sending UDA creation / removal message
+ * @author ashahid
+ * @since 9.0 
+ */
+public class UDAMessage extends DistributionMessage {
+  private boolean isCreate;
+  private String udaName;
+  private String udaClass;
+
+  public UDAMessage() {}
+
+  public UDAMessage(String name, String udaClass) {
+    this.isCreate = true;
+    this.udaName = name;
+    this.udaClass = udaClass;
+  }
+
+  public UDAMessage(String name) {
+    this.isCreate = false;
+    this.udaName = name;
+  }
+
+  @Override
+  public int getDSFID() {
+    return UDA_MESSAGE;
+  }
+
+  @Override
+  public int getProcessorType() {
+    return DistributionManager.SERIAL_EXECUTOR;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    if (this.isCreate) {
+      try {
+        GemFireCacheImpl.getExisting().getUDAManager().createUDALocally(this.udaName, this.udaClass);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    } else {
+      GemFireCacheImpl.getExisting().getUDAManager().removeUDALocally(this.udaName);
+    }
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.isCreate = DataSerializer.readPrimitiveBoolean(in);
+    this.udaName = DataSerializer.readString(in);
+    if (this.isCreate) {
+      this.udaClass = DataSerializer.readString(in);
+    }
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writePrimitiveBoolean(this.isCreate, out);
+    DataSerializer.writeString(this.udaName, out);
+    if (this.isCreate) {
+      DataSerializer.writeString(this.udaClass, out);
+    }
+  }
+
+  public void send() {
+    GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+    DistributionAdvisor advisor = gfc.getDistributionAdvisor();
+    final Set<InternalDistributedMember> recipients = new HashSet<InternalDistributedMember>(advisor.adviseGeneric());
+    recipients.remove(gfc.getDistributionManager().getId());
+    this.setRecipients(recipients);
+    gfc.getDistributionManager().putOutgoing(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
new file mode 100644
index 0000000..60d862c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.xmlcache;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+
+public class UDAManagerCreation implements UDAManager {
+  private Map<String, String> map = new HashMap<String, String>();
+  
+  @Override
+  public Map<String, String> getUDAs() {
+    return Collections.unmodifiableMap(this.map);
+  }
+  
+  @Override
+  public void createUDA(String name, String fqClass) {
+    this.map.put(name, fqClass);  
+  }
+  
+  @Override
+  public void removeUDA(String name) {
+    this.map.remove(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
new file mode 100644
index 0000000..2abcce2
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl.SumUDA;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass1;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass2;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass3;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
+import com.gemstone.gemfire.internal.cache.xmlcache.ClientCacheCreation;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+@Category(DistributedTest.class)
+public class UDACreationDUnitTest extends CacheTestCase {
+  public UDACreationDUnitTest(String name) {
+    super(name);
+  }
+
+  @Test
+  public void testUDACreationThroughProfileExchange() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = getCache();
+    final String udaName = "uda1";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+    createCache(vm1, vm2, vm3);
+    validateUDAExists(udaName, vm1, vm2, vm3);
+    this.closeCache(vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testUDACreationThroughMessage() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = getCache();
+    createCache(vm1, vm2, vm3);
+    final String udaName = "uda1";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+    validateUDAExists(udaName, vm1, vm2, vm3);
+    this.closeCache(vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testUDARemovalThroughMessage() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = getCache();
+    createCache(vm1, vm2, vm3);
+    final String udaName = "uda1";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+    validateUDAExists(udaName, vm1, vm2, vm3);
+    qs.removeUDA(udaName);
+    validateUDADoesNotExists(udaName, vm1, vm2, vm3);
+    this.closeCache(vm1, vm2, vm3);
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testUDAProfileMerge() {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+
+    final CacheCreation cacheCreation1 = new CacheCreation();
+    cacheCreation1.addUDA("uda1", UDACLass1.class.getName());
+    Helper helper = new Helper();
+    helper.createCacheThruXML(cacheCreation1);
+    final Cache c = getCache();
+    assertNotNull(c);
+    UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+    Map<String, String> map = udaMgr.getUDAs();
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+
+    // Now in VM1 create another cache through XMl containing uda2
+    vm1.invoke(new SerializableRunnable("Create Cache in other VM") {
+      public void run() {
+
+        final CacheCreation cacheCreation2 = new CacheCreation();
+        cacheCreation2.addUDA("uda2", UDACLass2.class.getName());
+        cacheCreation2.addUDA("uda3", UDACLass3.class.getName());
+        Helper helper = new Helper();
+        helper.createCacheThruXML(cacheCreation2);
+        final Cache c = getCache();
+
+      }
+    });
+    // This VM should also have 3 UDAs at the end of intialization of remote vm
+
+    map = udaMgr.getUDAs();
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+    assertEquals(map.get("uda2"), UDACLass2.class.getName());
+    assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+    vm1.invoke(new SerializableRunnable("Create Cache in other VM") {
+      public void run() {
+        // validate at the end of intialization, there exists 3 UDAs
+        UDAManager udaMgr = ((GemFireCacheImpl) getCache()).getUDAManager();
+        Map<String, String> map = udaMgr.getUDAs();
+        assertEquals(map.get("uda1"), UDACLass1.class.getName());
+        assertEquals(map.get("uda2"), UDACLass2.class.getName());
+        assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+      }
+    });
+
+  }
+  
+  
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testUDAProfileMergeMultipleVMs() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    AsyncInvocation one = this.createCacheWithUDAAsynchronously("uda1", UDACLass1.class.getName(), vm1);
+    AsyncInvocation two = this.createCacheWithUDAAsynchronously("uda2", UDACLass2.class.getName(), vm2);
+    AsyncInvocation three = this.createCacheWithUDAAsynchronously("uda3", UDACLass3.class.getName(), vm3);
+   
+    final Cache c = getCache();
+    assertNotNull(c);
+    one.join();
+    two.join();
+    three.join();
+    UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+    Map<String, String> map = udaMgr.getUDAs();   
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+    assertEquals(map.get("uda2"), UDACLass2.class.getName());
+    assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+    validateUDAExists("uda1", vm1, vm2, vm3);
+    validateUDAExists("uda2", vm1, vm2, vm3);
+    validateUDAExists("uda3", vm1, vm2, vm3);
+
+  }
+
+  @Override
+  public final void postSetUp() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  private void createCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable("create cache") {
+        public void run() {
+          Cache cache = getCache();
+        }
+      });
+    }
+  }
+
+  private void validateUDAExists(final String udaName, VM... vms) {
+    try {
+      Class<Aggregator> udaClass = ((GemFireCacheImpl) this.getCache()).getUDAManager().getUDAClass(udaName);
+      assertNotNull(udaClass);
+    } catch (NameResolutionException nre) {
+      throw new RuntimeException(nre);
+    }
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable("validate UDA exists") {
+        public void run() {
+          try {
+            Class<Aggregator> udaClass = ((GemFireCacheImpl) getCache()).getUDAManager().getUDAClass(udaName);
+            assertNotNull(udaClass);
+          } catch (NameResolutionException nre) {
+            throw new RuntimeException(nre);
+          }
+
+        }
+      });
+    }
+  }
+  
+  private AsyncInvocation createCacheWithUDAAsynchronously(final String udaName, 
+      final String udaClass, VM vm) {
+    // Now in VM1 create another cache through XMl containing uda2
+    return vm.invokeAsync(new SerializableRunnable("Create Cache in other VM") {
+      public void run() {
+        final CacheCreation cacheCreation2 = new CacheCreation();
+        cacheCreation2.addUDA(udaName, udaClass);       
+        Helper helper = new Helper();
+        helper.createCacheThruXML(cacheCreation2);
+        final Cache c = getCache();
+
+      }
+    });
+  }
+
+  private void validateUDADoesNotExists(final String udaName, VM... vms) {
+    try {
+      Class<Aggregator> udaClass = ((GemFireCacheImpl) this.getCache()).getUDAManager().getUDAClass(udaName);
+      fail("UDA should not exist");
+    } catch (NameResolutionException nre) {
+      // OK
+    }
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable("validate UDA exists") {
+        public void run() {
+          try {
+            Class<Aggregator> udaClass = ((GemFireCacheImpl) getCache()).getUDAManager().getUDAClass(udaName);
+            fail("UDA should not exist");
+          } catch (NameResolutionException nre) {
+            // OK
+          }
+
+        }
+      });
+    }
+  }
+
+  private void closeCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable() {
+        public void run() {
+          getCache().close();
+        }
+      });
+    }
+  }
+
+  protected String getGemFireVersion() {
+    return CacheXml.VERSION_1_0;
+  }
+
+  public static class SumUDA implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+
+    }
+
+  }
+
+  public class UDACLass1 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+    }
+
+  }
+
+  public class UDACLass2 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+    }
+
+  }
+
+  public class UDACLass3 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+    }
+
+  }
+
+  public static class Helper extends CacheXmlTestCase {
+    public Helper() {
+      super("UDACreationDUnitTest:testUDAProfileMerge");
+      CacheXmlTestCase.lonerDistributedSystem = false;
+    }
+
+    public void createCacheThruXML(CacheCreation creation) {
+      this.testXml(creation, true);
+    }
+
+    @Override
+    protected String getGemFireVersion() {
+      return CacheXml.VERSION_1_0;
+    }
+
+    @Override
+    protected boolean getUseSchema() {
+      return true;
+    }
+    
+    @Test
+    public void testDummy(){}
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
new file mode 100644
index 0000000..7f80657
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.query.Index;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.IndexType;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestInterface;
+import com.gemstone.gemfire.cache.query.functional.NonDistinctOrderByTestImplementation;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * 
+ * @author ashahid
+ *
+ */
+public abstract class UDADUnitImpl extends CacheTestCase implements UDATestInterface{
+
+
+  public UDADUnitImpl(String name) {
+    super(name);
+  }
+
+  protected abstract UDATestInterface createTestInstance();
+
+  @Test
+  @Override
+  public void testUDANoGroupBy() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    UDATestInterface test = createTestInstance();
+    test.testUDANoGroupBy();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  @Override
+  public void testUDAWithGroupBy() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    UDATestInterface test = createTestInstance();
+    test.testUDAWithGroupBy();;
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+ 
+  
+  protected void createIndex(VM vm, final String indexName,
+      final String indexedExpression, final String regionPath) {
+    vm.invoke(new SerializableRunnable("create index") {
+      public void run() {
+        try {
+          Cache cache = getCache();
+          cache.getQueryService().createIndex(indexName, indexedExpression,
+              regionPath);
+        } catch (RegionNotFoundException e) {
+          fail(e.toString());
+        } catch (IndexExistsException e) {
+          fail(e.toString());
+        } catch (IndexNameConflictException e) {
+          fail(e.toString());
+        }
+      }
+    });
+  }
+
+  @Override
+  public final void postSetUp() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    disconnectAllFromDS();
+  }
+  
+
+  private void closeCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable() {
+        public void run() {
+          getCache().close();
+        }
+      });
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
new file mode 100644
index 0000000..ba0ce9a
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+@Category(DistributedTest.class)
+public class UDAPartitionedQueryDUnitTest extends UDADUnitImpl {
+
+
+  public UDAPartitionedQueryDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  protected UDATestInterface createTestInstance() {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+
+    UDATestImpl test = new UDATestImpl() {
+
+      @Override
+      public Region createRegion(String regionName, Class valueConstraint) {
+        // TODO Auto-generated method stub
+        Region rgn = createAccessor(regionName, valueConstraint);
+        createPR(vm1, regionName, valueConstraint);
+        createPR(vm2, regionName, valueConstraint);
+        createPR(vm3, regionName, valueConstraint);
+        return rgn;
+      }
+    };
+    return test;
+  }
+
+  private void createBuckets(VM vm) {
+    vm.invoke(new SerializableRunnable("create accessor") {
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region");
+        for (int i = 0; i < 10; i++) {
+          region.put(i, i);
+        }
+      }
+    });
+  }
+
+  private void createPR(VM vm, final String regionName,
+      final Class valueConstraint) {
+    vm.invoke(new SerializableRunnable("create data store") {
+      public void run() {
+        Cache cache = getCache();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setTotalNumBuckets(10);
+        cache.createRegionFactory(RegionShortcut.PARTITION)
+            .setValueConstraint(valueConstraint)
+            .setPartitionAttributes(paf.create()).create(regionName);
+      }
+    });
+  }
+
+  private Region createAccessor(String regionName, Class valueConstraint) {
+
+    Cache cache = getCache();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(10);
+    paf.setLocalMaxMemory(0);
+    return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+        .setValueConstraint(valueConstraint)
+        .setPartitionAttributes(paf.create()).create(regionName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
new file mode 100644
index 0000000..1f5b3e7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class UDAPartitionedJUnitTest extends UDATestImpl {
+
+  @Override
+  public Region createRegion(String regionName, Class valueConstraint) {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    AttributesFactory af = new AttributesFactory();
+    af.setPartitionAttributes(paf.create());
+    af.setValueConstraint(valueConstraint);
+    Region r1 = CacheUtils.createRegion(regionName, af.create(), false);
+    return r1;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
new file mode 100644
index 0000000..9db76ab
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class UDAReplicatedJUnitTest extends UDATestImpl {
+
+  @Override
+  public Region createRegion(String regionName, Class valueConstraint) {
+    Region r1 = CacheUtils.createRegion(regionName, valueConstraint);
+    return r1;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
new file mode 100644
index 0000000..7b59243
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+
+/**
+ * Tests the group by queries with or without aggreagte functions
+ * 
+ * @author Asif
+ *
+ *
+ */
+public abstract class UDATestImpl implements UDATestInterface {
+
+ 
+  public abstract Region createRegion(String regionName, Class constraint);
+
+  @Test
+  public void testUDANoGroupBy() throws Exception {
+    Region region = this.createRegion("portfolio", Portfolio.class);
+    int sum = 0;
+    for (int i = 1; i < 200; ++i) {
+      Portfolio pf = new Portfolio(i);
+      pf.shortID = (short) ((short) i / 5);
+      region.put("" + i, pf);
+      sum += pf.ID;
+    }
+    String queryStr = "select myUDA(p.ID) from /portfolio p where p.ID > 0  ";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA("myUDA", "com.gemstone.gemfire.cache.query.functional.UDATestImpl$SumUDA");
+    Query query = qs.newQuery(queryStr);
+    SelectResults sr = (SelectResults) query.execute();
+    assertEquals(sum, ((Integer)sr.asList().get(0)).intValue());
+  }
+
+  @Test
+  public void testUDAWithGroupBy() throws Exception {
+    Region region = this.createRegion("portfolio", Portfolio.class);
+    int sumActive = 0;
+    int sumInactive = 0;
+    for (int i = 1; i < 200; ++i) {
+      Portfolio pf = new Portfolio(i);
+      pf.shortID = (short) ((short) i / 5);
+      region.put("" + i, pf);
+      if(pf.status.equals("active")) {
+        sumActive += pf.ID;
+      }else {
+        sumInactive += pf.ID;
+      }
+      
+    }
+    String queryStr = "select p.status , myUDA(p.ID) from /portfolio p where p.ID > 0 group by p.status order by p.status";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA("myUDA", "com.gemstone.gemfire.cache.query.functional.UDATestImpl$SumUDA");
+    Query query = qs.newQuery(queryStr);
+    SelectResults sr = (SelectResults) query.execute();
+    List<Struct> structs = (List<Struct>)sr.asList();
+    assertEquals(2, structs.size());
+    assertTrue(structs.get(0).getFieldValues()[0].equals("active"));
+    assertEquals(sumActive, ((Integer)structs.get(0).getFieldValues()[1]).intValue());
+    
+    assertTrue(structs.get(1).getFieldValues()[0].equals("inactive"));
+    assertEquals(sumInactive, ((Integer)structs.get(1).getFieldValues()[1]).intValue());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    CacheUtils.startCache();
+    CacheUtils.getCache();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CacheUtils.closeCache();
+  }
+  
+  public static class SumUDA implements Aggregator, Serializable {
+
+    private int sum =0;
+    
+    public SumUDA() {
+      
+    }
+    @Override
+    public void accumulate(Object value) {
+      sum += ((Integer)value).intValue();
+      
+    }
+
+    @Override
+    public void init() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public Object terminate() {      
+      return Integer.valueOf(sum);
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+      SumUDA uda = (SumUDA)otherAggregator;
+      this.sum += uda.sum;      
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
new file mode 100644
index 0000000..f7c9e40
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+public interface UDATestInterface {
+  public void testUDANoGroupBy() throws Exception ;  
+  public void  testUDAWithGroupBy() throws Exception ;
+}