You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/21 00:29:23 UTC

[1/6] incubator-geode git commit: Implementing the UDA functionality in the OQL engine

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1269 [created] 4d2203524


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-site/website/content/schema/cache/cache-1.0.xsd
----------------------------------------------------------------------
diff --git a/geode-site/website/content/schema/cache/cache-1.0.xsd b/geode-site/website/content/schema/cache/cache-1.0.xsd
index c9c4f91..d8fc707 100644
--- a/geode-site/website/content/schema/cache/cache-1.0.xsd
+++ b/geode-site/website/content/schema/cache/cache-1.0.xsd
@@ -77,6 +77,18 @@ declarative caching XML file elements unless indicated otherwise.
       <xsd:sequence>
         <xsd:element maxOccurs="1" minOccurs="0" name="cache-transaction-manager" type="gf:cache-transaction-manager-type" />
         <xsd:element maxOccurs="1" minOccurs="0" name="dynamic-region-factory" type="gf:dynamic-region-factory-type" />
+        <xsd:element maxOccurs="1" minOccurs="0" name="uda-manager">
+          <xsd:complexType>
+           <xsd:sequence>
+            <xsd:element maxOccurs="unbounded" minOccurs="0" name="uda">
+              <xsd:complexType>
+                <xsd:attribute name="name" type="xsd:string" use="required" />
+                <xsd:attribute name="class" type="xsd:string" use="required" />
+              </xsd:complexType>
+            </xsd:element>
+          </xsd:sequence>
+         </xsd:complexType>
+        </xsd:element>  
         <xsd:element maxOccurs="unbounded" minOccurs="0" name="gateway-hub">
           <xsd:annotation>
             <xsd:documentation>
@@ -1354,6 +1366,7 @@ As of 6.5 disk-dirs is deprecated on region-attributes. Use disk-store-name inst
     </xsd:sequence>
   </xsd:complexType>
 
+
   <xsd:complexType name="jndi-bindings-type">
     <xsd:annotation>
       <xsd:documentation>


[4/6] incubator-geode git commit: Removing the inbuilt aggregate classes specific to bucket nodes/ query nodes

Posted by up...@apache.org.
Removing the inbuilt aggregate classes specific to bucket nodes/ query nodes


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8bc7afac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8bc7afac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8bc7afac

Branch: refs/heads/feature/GEODE-1269
Commit: 8bc7afac334ecdeee89d9576af95aea2e73a52bf
Parents: 4f85cac
Author: Asif Shahid <as...@snappydata.io>
Authored: Thu Apr 14 22:03:14 2016 -0700
Committer: Asif Shahid <as...@snappydata.io>
Committed: Thu Apr 14 22:03:14 2016 -0700

----------------------------------------------------------------------
 .../query/internal/aggregate/AvgBucketNode.java | 48 --------------------
 .../aggregate/AvgDistinctPRQueryNode.java       | 34 --------------
 .../internal/aggregate/AvgPRQueryNode.java      | 46 -------------------
 .../aggregate/CountDistinctPRQueryNode.java     | 43 ------------------
 .../internal/aggregate/CountPRQueryNode.java    | 47 -------------------
 .../aggregate/SumDistinctPRQueryNode.java       | 45 ------------------
 6 files changed, 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8bc7afac/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgBucketNode.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgBucketNode.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgBucketNode.java
deleted file mode 100644
index dae038e..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgBucketNode.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.query.internal.aggregate;
-
-import com.gemstone.gemfire.cache.query.QueryService;
-
-/**
- * The aggregator for compuing average which is used on the bucket node for
- * partitioned region based queries.
- * 
- *
- */
-public class AvgBucketNode extends Sum {
-
-  private int count = 0;
-
-  @Override
-  public void accumulate(Object value) {
-    if (value != null && value != QueryService.UNDEFINED) {
-      super.accumulate(value);
-      ++count;
-    }
-  }
-
-  /**
-   * Returns a two element array of the total number of values & the computed
-   * sum of the values.
-   */
-  @Override
-  public Object terminate() {
-    return new Object[] { Integer.valueOf(count), super.terminate() };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8bc7afac/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
deleted file mode 100644
index 20d368d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinctPRQueryNode.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.query.internal.aggregate;
-
-/**
- * Computes the final average of distinct values for the partitioned region
- * based queries. This aggregator is initialized on the PR query node & acts on
- * the results obtained from bucket nodes.
- * 
- *
- */
-public class AvgDistinctPRQueryNode extends SumDistinctPRQueryNode {
-
-  @Override
-  public Object terminate() {
-    double sum = ((Number) super.terminate()).doubleValue();
-    double result = sum / this.distinct.size();
-    return downCast(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8bc7afac/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgPRQueryNode.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgPRQueryNode.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgPRQueryNode.java
deleted file mode 100644
index f892971..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgPRQueryNode.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.query.internal.aggregate;
-
-/**
- * Computes the final non distinct average for a partitioned region based query.
- * This aggregator is instantiated on the PR query node.
- * 
- *
- */
-public class AvgPRQueryNode extends Sum {
-  private int count = 0;
-
-  /**
-   * Takes the input of data received from bucket nodes. The data is of the form
-   * of two element array. The first element is the number of values, while the
-   * second element is the sum of the values.
-   */
-  @Override
-  public void accumulate(Object value) {
-    Object[] array = (Object[]) value;
-    this.count += ((Integer) array[0]).intValue();
-    super.accumulate(array[1]);
-  }
-
-  @Override
-  public Object terminate() {
-    double sum = ((Number) super.terminate()).doubleValue();
-    double result = sum / count;
-    return downCast(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8bc7afac/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinctPRQueryNode.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
deleted file mode 100644
index b2f88a7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinctPRQueryNode.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.query.internal.aggregate;
-
-import java.util.Set;
-
-/**
- * Computes the count of the distinct rows on the PR query node.
- * 
- *
- */
-public class CountDistinctPRQueryNode extends DistinctAggregator {
-
-  /**
-   * The input data is the Set containing distinct values from each of the
-   * bucket nodes.
-   */
-  @Override
-  public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
-
-  }
-
-  @Override
-  public Object terminate() {
-    return Integer.valueOf(this.distinct.size());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8bc7afac/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountPRQueryNode.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountPRQueryNode.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountPRQueryNode.java
deleted file mode 100644
index 50eb07b..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountPRQueryNode.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.query.internal.aggregate;
-
-import com.gemstone.gemfire.cache.query.Aggregator;
-
-/**
- * Computes the count of the rows on the PR query node
- * 
- *
- */
-public class CountPRQueryNode implements Aggregator {
-  private int count = 0;
-
-  /**
-   * Recieves the input of the individual counts from the bucket nodes.
-   */
-  @Override
-  public void accumulate(Object value) {
-    this.count += ((Integer) value).intValue();
-  }
-
-  @Override
-  public void init() {
-
-  }
-
-  @Override
-  public Object terminate() {
-    return Integer.valueOf(count);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8bc7afac/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinctPRQueryNode.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
deleted file mode 100644
index bee5df2..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinctPRQueryNode.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.query.internal.aggregate;
-
-import java.util.Set;
-
-/**
- * Computes the sum of distinct values on the PR query node.
- * 
- *
- */
-public class SumDistinctPRQueryNode extends DistinctAggregator {
-
-  /**
-   * The input data is the Set of values(distinct) receieved from each of the
-   * bucket nodes.
-   */
-  @Override
-  public void accumulate(Object value) {
-    this.distinct.addAll((Set) value);
-  }
-
-  @Override
-  public Object terminate() {
-    double sum = 0;
-    for (Object o : this.distinct) {
-      sum += ((Number) o).doubleValue();
-    }
-    return downCast(sum);
-  }
-}


[6/6] incubator-geode git commit: Fix for the test failure caused due to addition of new element uda-manager

Posted by up...@apache.org.
Fix for the test failure caused due to addition of new element uda-manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4d220352
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4d220352
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4d220352

Branch: refs/heads/feature/GEODE-1269
Commit: 4d22035246d53258f970ffac2f9df5e6f3359994
Parents: 42fb6fc
Author: Asif Shahid <as...@snappydata.io>
Authored: Fri Apr 15 10:59:37 2016 -0700
Committer: Asif Shahid <as...@snappydata.io>
Committed: Fri Apr 15 10:59:37 2016 -0700

----------------------------------------------------------------------
 .../internal/configuration/domain/CacheElementJUnitTest.java        | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4d220352/geode-core/src/test/java/com/gemstone/gemfire/management/internal/configuration/domain/CacheElementJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/management/internal/configuration/domain/CacheElementJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/management/internal/configuration/domain/CacheElementJUnitTest.java
index dca5d0b..de4f8b7 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/management/internal/configuration/domain/CacheElementJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/management/internal/configuration/domain/CacheElementJUnitTest.java
@@ -112,6 +112,7 @@ public class CacheElementJUnitTest {
     int order = 0;
     assertEntry("cache-transaction-manager", order++, entries.next());
     assertEntry("dynamic-region-factory", order++, entries.next());
+    assertEntry("uda-manager", order++, entries.next());
     assertEntry("gateway-hub", order++, entries.next());
     assertEntry("gateway-sender", order++, entries.next());
     assertEntry("gateway-receiver", order++, entries.next());


[2/6] incubator-geode git commit: Implementing the UDA functionality in the OQL engine

Posted by up...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLParser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLParser.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLParser.java
index bb70b05..1ae0f9e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLParser.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLParser.java
@@ -1,4 +1,4 @@
-// $ANTLR 2.7.4: "oql.g" -> "OQLParser.java"$
+// $ANTLR 2.7.7 (20060906): "oql.g" -> "OQLParser.java"$
 
 package com.gemstone.gemfire.cache.query.internal.parse;
 import java.util.*;
@@ -293,7 +293,7 @@ public OQLParser(ParserSharedInputState state) {
 				synPredMatched153 = false;
 			}
 			rewind(_m153);
-			inputState.guessing--;
+inputState.guessing--;
 		}
 		if ( synPredMatched153 ) {
 			identifier();
@@ -1644,7 +1644,7 @@ public OQLParser(ParserSharedInputState state) {
 		expr();
 		astFactory.addASTChild(currentAST, returnAST);
 		{
-		_loop268:
+		_loop269:
 		do {
 			if ((LA(1)==TOK_COMMA)) {
 				match(TOK_COMMA);
@@ -1652,7 +1652,7 @@ public OQLParser(ParserSharedInputState state) {
 				astFactory.addASTChild(currentAST, returnAST);
 			}
 			else {
-				break _loop268;
+				break _loop269;
 			}
 			
 		} while (true);
@@ -1766,7 +1766,7 @@ public OQLParser(ParserSharedInputState state) {
 				synPredMatched183 = false;
 			}
 			rewind(_m183);
-			inputState.guessing--;
+inputState.guessing--;
 		}
 		if ( synPredMatched183 ) {
 			lp = LT(1);
@@ -1947,7 +1947,7 @@ public OQLParser(ParserSharedInputState state) {
 					synPredMatched195 = false;
 				}
 				rewind(_m195);
-				inputState.guessing--;
+inputState.guessing--;
 			}
 			if ( synPredMatched195 ) {
 				AST tmp99_AST = null;
@@ -2594,17 +2594,6 @@ public OQLParser(ParserSharedInputState state) {
 		
 		{
 		switch ( LA(1)) {
-		case LITERAL_distinct:
-		case LITERAL_listtoset:
-		case LITERAL_element:
-		case LITERAL_flatten:
-		case LITERAL_nvl:
-		case LITERAL_to_date:
-		{
-			conversionExpr();
-			astFactory.addASTChild(currentAST, returnAST);
-			break;
-		}
 		case LITERAL_exists:
 		case LITERAL_first:
 		case LITERAL_last:
@@ -2688,46 +2677,51 @@ public OQLParser(ParserSharedInputState state) {
 			break;
 		}
 		default:
-			boolean synPredMatched246 = false;
-			if (((LA(1)==QuotedIdentifier||LA(1)==Identifier) && (LA(2)==TOK_LPAREN))) {
-				int _m246 = mark();
-				synPredMatched246 = true;
-				inputState.guessing++;
-				try {
-					{
-					identifier();
-					match(TOK_LPAREN);
-					identifier();
-					match(TOK_COLON);
+			if ((_tokenSet_28.member(LA(1))) && (LA(2)==TOK_LPAREN)) {
+				conversionExpr();
+				astFactory.addASTChild(currentAST, returnAST);
+			}
+			else {
+				boolean synPredMatched246 = false;
+				if (((LA(1)==QuotedIdentifier||LA(1)==Identifier) && (LA(2)==TOK_LPAREN))) {
+					int _m246 = mark();
+					synPredMatched246 = true;
+					inputState.guessing++;
+					try {
+						{
+						identifier();
+						match(TOK_LPAREN);
+						identifier();
+						match(TOK_COLON);
+						}
+					}
+					catch (RecognitionException pe) {
+						synPredMatched246 = false;
 					}
+					rewind(_m246);
+inputState.guessing--;
 				}
-				catch (RecognitionException pe) {
-					synPredMatched246 = false;
+				if ( synPredMatched246 ) {
+					objectConstruction();
+					astFactory.addASTChild(currentAST, returnAST);
 				}
-				rewind(_m246);
-				inputState.guessing--;
-			}
-			if ( synPredMatched246 ) {
-				objectConstruction();
-				astFactory.addASTChild(currentAST, returnAST);
-			}
-			else if ((LA(1)==QuotedIdentifier||LA(1)==Identifier) && (LA(2)==TOK_LPAREN)) {
-				methodInvocation(true);
-				astFactory.addASTChild(currentAST, returnAST);
+				else if ((LA(1)==QuotedIdentifier||LA(1)==Identifier) && (LA(2)==TOK_LPAREN)) {
+					methodInvocation(true);
+					astFactory.addASTChild(currentAST, returnAST);
+				}
+				else if ((LA(1)==QuotedIdentifier||LA(1)==Identifier) && (_tokenSet_27.member(LA(2)))) {
+					identifier();
+					astFactory.addASTChild(currentAST, returnAST);
+				}
+			else {
+				throw new NoViableAltException(LT(1), getFilename());
 			}
-			else if ((LA(1)==QuotedIdentifier||LA(1)==Identifier) && (_tokenSet_27.member(LA(2)))) {
-				identifier();
-				astFactory.addASTChild(currentAST, returnAST);
+			}}
 			}
-		else {
-			throw new NoViableAltException(LT(1), getFilename());
-		}
-		}
+			primaryExpr_AST = (AST)currentAST.root;
+			returnAST = primaryExpr_AST;
 		}
-		primaryExpr_AST = (AST)currentAST.root;
-		returnAST = primaryExpr_AST;
-	}
-	
+		
 	public final void index() throws RecognitionException, TokenStreamException {
 		
 		returnAST = null;
@@ -2868,6 +2862,9 @@ public OQLParser(ParserSharedInputState state) {
 		returnAST = null;
 		ASTPair currentAST = new ASTPair();
 		AST conversionExpr_AST = null;
+		Token  n = null;
+		com.gemstone.gemfire.cache.query.internal.parse.ASTAggregateFunc n_AST = null;
+		AST tokExpr1_AST = null;
 		
 		{
 		switch ( LA(1)) {
@@ -2960,6 +2957,28 @@ public OQLParser(ParserSharedInputState state) {
 			}
 			break;
 		}
+		case Identifier:
+		{
+			{
+			n = LT(1);
+			n_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTAggregateFunc)astFactory.create(n,"com.gemstone.gemfire.cache.query.internal.parse.ASTAggregateFunc");
+			astFactory.makeASTRoot(currentAST, n_AST);
+			match(Identifier);
+			match(TOK_LPAREN);
+			expr();
+			tokExpr1_AST = (AST)returnAST;
+			astFactory.addASTChild(currentAST, returnAST);
+			match(TOK_RPAREN);
+			if ( inputState.guessing==0 ) {
+				conversionExpr_AST = (AST)currentAST.root;
+				
+				((ASTAggregateFunc)conversionExpr_AST).setAggregateFunctionType(UDA);
+				((ASTAggregateFunc)conversionExpr_AST).setUDAName(n.getText());
+				
+			}
+			}
+			break;
+		}
 		default:
 		{
 			throw new NoViableAltException(LT(1), getFilename());
@@ -2980,33 +2999,33 @@ public OQLParser(ParserSharedInputState state) {
 		switch ( LA(1)) {
 		case LITERAL_first:
 		{
-			AST tmp156_AST = null;
-			tmp156_AST = astFactory.create(LT(1));
-			astFactory.makeASTRoot(currentAST, tmp156_AST);
+			AST tmp158_AST = null;
+			tmp158_AST = astFactory.create(LT(1));
+			astFactory.makeASTRoot(currentAST, tmp158_AST);
 			match(LITERAL_first);
 			break;
 		}
 		case LITERAL_last:
 		{
-			AST tmp157_AST = null;
-			tmp157_AST = astFactory.create(LT(1));
-			astFactory.makeASTRoot(currentAST, tmp157_AST);
+			AST tmp159_AST = null;
+			tmp159_AST = astFactory.create(LT(1));
+			astFactory.makeASTRoot(currentAST, tmp159_AST);
 			match(LITERAL_last);
 			break;
 		}
 		case LITERAL_unique:
 		{
-			AST tmp158_AST = null;
-			tmp158_AST = astFactory.create(LT(1));
-			astFactory.makeASTRoot(currentAST, tmp158_AST);
+			AST tmp160_AST = null;
+			tmp160_AST = astFactory.create(LT(1));
+			astFactory.makeASTRoot(currentAST, tmp160_AST);
 			match(LITERAL_unique);
 			break;
 		}
 		case LITERAL_exists:
 		{
-			AST tmp159_AST = null;
-			tmp159_AST = astFactory.create(LT(1));
-			astFactory.makeASTRoot(currentAST, tmp159_AST);
+			AST tmp161_AST = null;
+			tmp161_AST = astFactory.create(LT(1));
+			astFactory.makeASTRoot(currentAST, tmp161_AST);
 			match(LITERAL_exists);
 			break;
 		}
@@ -3032,15 +3051,15 @@ public OQLParser(ParserSharedInputState state) {
 		
 		{
 		if ((LA(1)==LITERAL_is_undefined)) {
-			com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr tmp162_AST = null;
-			tmp162_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr");
-			astFactory.makeASTRoot(currentAST, tmp162_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr tmp164_AST = null;
+			tmp164_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr");
+			astFactory.makeASTRoot(currentAST, tmp164_AST);
 			match(LITERAL_is_undefined);
 		}
 		else if ((LA(1)==LITERAL_is_defined)) {
-			com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr tmp163_AST = null;
-			tmp163_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr");
-			astFactory.makeASTRoot(currentAST, tmp163_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr tmp165_AST = null;
+			tmp165_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTUndefinedExpr");
+			astFactory.makeASTRoot(currentAST, tmp165_AST);
 			match(LITERAL_is_defined);
 		}
 		else {
@@ -3086,9 +3105,9 @@ public OQLParser(ParserSharedInputState state) {
 		ASTPair currentAST = new ASTPair();
 		AST structConstruction_AST = null;
 		
-		AST tmp167_AST = null;
-		tmp167_AST = astFactory.create(LT(1));
-		astFactory.makeASTRoot(currentAST, tmp167_AST);
+		AST tmp169_AST = null;
+		tmp169_AST = astFactory.create(LT(1));
+		astFactory.makeASTRoot(currentAST, tmp169_AST);
 		match(LITERAL_struct);
 		match(TOK_LPAREN);
 		fieldList();
@@ -3110,25 +3129,25 @@ public OQLParser(ParserSharedInputState state) {
 			switch ( LA(1)) {
 			case LITERAL_array:
 			{
-				AST tmp170_AST = null;
-				tmp170_AST = astFactory.create(LT(1));
-				astFactory.makeASTRoot(currentAST, tmp170_AST);
+				AST tmp172_AST = null;
+				tmp172_AST = astFactory.create(LT(1));
+				astFactory.makeASTRoot(currentAST, tmp172_AST);
 				match(LITERAL_array);
 				break;
 			}
 			case LITERAL_set:
 			{
-				com.gemstone.gemfire.cache.query.internal.parse.ASTConstruction tmp171_AST = null;
-				tmp171_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTConstruction)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTConstruction");
-				astFactory.makeASTRoot(currentAST, tmp171_AST);
+				com.gemstone.gemfire.cache.query.internal.parse.ASTConstruction tmp173_AST = null;
+				tmp173_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTConstruction)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTConstruction");
+				astFactory.makeASTRoot(currentAST, tmp173_AST);
 				match(LITERAL_set);
 				break;
 			}
 			case LITERAL_bag:
 			{
-				AST tmp172_AST = null;
-				tmp172_AST = astFactory.create(LT(1));
-				astFactory.makeASTRoot(currentAST, tmp172_AST);
+				AST tmp174_AST = null;
+				tmp174_AST = astFactory.create(LT(1));
+				astFactory.makeASTRoot(currentAST, tmp174_AST);
 				match(LITERAL_bag);
 				break;
 			}
@@ -3142,13 +3161,13 @@ public OQLParser(ParserSharedInputState state) {
 			astFactory.addASTChild(currentAST, returnAST);
 		}
 		else if ((LA(1)==LITERAL_list)) {
-			AST tmp173_AST = null;
-			tmp173_AST = astFactory.create(LT(1));
-			astFactory.makeASTRoot(currentAST, tmp173_AST);
+			AST tmp175_AST = null;
+			tmp175_AST = astFactory.create(LT(1));
+			astFactory.makeASTRoot(currentAST, tmp175_AST);
 			match(LITERAL_list);
-			AST tmp174_AST = null;
-			tmp174_AST = astFactory.create(LT(1));
-			astFactory.addASTChild(currentAST, tmp174_AST);
+			AST tmp176_AST = null;
+			tmp176_AST = astFactory.create(LT(1));
+			astFactory.addASTChild(currentAST, tmp176_AST);
 			match(TOK_LPAREN);
 			{
 			if ((_tokenSet_4.member(LA(1)))) {
@@ -3156,27 +3175,27 @@ public OQLParser(ParserSharedInputState state) {
 				astFactory.addASTChild(currentAST, returnAST);
 				{
 				if ((LA(1)==TOK_DOTDOT)) {
-					AST tmp175_AST = null;
-					tmp175_AST = astFactory.create(LT(1));
-					astFactory.addASTChild(currentAST, tmp175_AST);
+					AST tmp177_AST = null;
+					tmp177_AST = astFactory.create(LT(1));
+					astFactory.addASTChild(currentAST, tmp177_AST);
 					match(TOK_DOTDOT);
 					expr();
 					astFactory.addASTChild(currentAST, returnAST);
 				}
 				else if ((LA(1)==TOK_RPAREN||LA(1)==TOK_COMMA)) {
 					{
-					_loop278:
+					_loop279:
 					do {
 						if ((LA(1)==TOK_COMMA)) {
-							AST tmp176_AST = null;
-							tmp176_AST = astFactory.create(LT(1));
-							astFactory.addASTChild(currentAST, tmp176_AST);
+							AST tmp178_AST = null;
+							tmp178_AST = astFactory.create(LT(1));
+							astFactory.addASTChild(currentAST, tmp178_AST);
 							match(TOK_COMMA);
 							expr();
 							astFactory.addASTChild(currentAST, returnAST);
 						}
 						else {
-							break _loop278;
+							break _loop279;
 						}
 						
 					} while (true);
@@ -3195,9 +3214,9 @@ public OQLParser(ParserSharedInputState state) {
 			}
 			
 			}
-			AST tmp177_AST = null;
-			tmp177_AST = astFactory.create(LT(1));
-			astFactory.addASTChild(currentAST, tmp177_AST);
+			AST tmp179_AST = null;
+			tmp179_AST = astFactory.create(LT(1));
+			astFactory.addASTChild(currentAST, tmp179_AST);
 			match(TOK_RPAREN);
 		}
 		else {
@@ -3292,9 +3311,9 @@ public OQLParser(ParserSharedInputState state) {
 		ASTPair currentAST = new ASTPair();
 		AST stringLiteral_AST = null;
 		
-		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp178_AST = null;
-		tmp178_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-		astFactory.addASTChild(currentAST, tmp178_AST);
+		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp180_AST = null;
+		tmp180_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+		astFactory.addASTChild(currentAST, tmp180_AST);
 		match(StringLiteral);
 		stringLiteral_AST = (AST)currentAST.root;
 		returnAST = stringLiteral_AST;
@@ -3312,7 +3331,7 @@ public OQLParser(ParserSharedInputState state) {
 		expr();
 		astFactory.addASTChild(currentAST, returnAST);
 		{
-		_loop271:
+		_loop272:
 		do {
 			if ((LA(1)==TOK_COMMA)) {
 				match(TOK_COMMA);
@@ -3323,7 +3342,7 @@ public OQLParser(ParserSharedInputState state) {
 				astFactory.addASTChild(currentAST, returnAST);
 			}
 			else {
-				break _loop271;
+				break _loop272;
 			}
 			
 		} while (true);
@@ -3349,27 +3368,27 @@ public OQLParser(ParserSharedInputState state) {
 		switch ( LA(1)) {
 		case LITERAL_nil:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp182_AST = null;
-			tmp182_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp182_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp184_AST = null;
+			tmp184_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp184_AST);
 			match(LITERAL_nil);
 			objectLiteral_AST = (AST)currentAST.root;
 			break;
 		}
 		case LITERAL_null:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp183_AST = null;
-			tmp183_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp183_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp185_AST = null;
+			tmp185_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp185_AST);
 			match(LITERAL_null);
 			objectLiteral_AST = (AST)currentAST.root;
 			break;
 		}
 		case LITERAL_undefined:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp184_AST = null;
-			tmp184_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp184_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp186_AST = null;
+			tmp186_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp186_AST);
 			match(LITERAL_undefined);
 			objectLiteral_AST = (AST)currentAST.root;
 			break;
@@ -3390,15 +3409,15 @@ public OQLParser(ParserSharedInputState state) {
 		
 		{
 		if ((LA(1)==LITERAL_true)) {
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp185_AST = null;
-			tmp185_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp185_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp187_AST = null;
+			tmp187_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp187_AST);
 			match(LITERAL_true);
 		}
 		else if ((LA(1)==LITERAL_false)) {
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp186_AST = null;
-			tmp186_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp186_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp188_AST = null;
+			tmp188_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp188_AST);
 			match(LITERAL_false);
 		}
 		else {
@@ -3420,33 +3439,33 @@ public OQLParser(ParserSharedInputState state) {
 		switch ( LA(1)) {
 		case NUM_INT:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp187_AST = null;
-			tmp187_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp187_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp189_AST = null;
+			tmp189_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp189_AST);
 			match(NUM_INT);
 			break;
 		}
 		case NUM_LONG:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp188_AST = null;
-			tmp188_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp188_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp190_AST = null;
+			tmp190_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp190_AST);
 			match(NUM_LONG);
 			break;
 		}
 		case NUM_FLOAT:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp189_AST = null;
-			tmp189_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp189_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp191_AST = null;
+			tmp191_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp191_AST);
 			match(NUM_FLOAT);
 			break;
 		}
 		case NUM_DOUBLE:
 		{
-			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp190_AST = null;
-			tmp190_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-			astFactory.addASTChild(currentAST, tmp190_AST);
+			com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp192_AST = null;
+			tmp192_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+			astFactory.addASTChild(currentAST, tmp192_AST);
 			match(NUM_DOUBLE);
 			break;
 		}
@@ -3466,13 +3485,13 @@ public OQLParser(ParserSharedInputState state) {
 		ASTPair currentAST = new ASTPair();
 		AST charLiteral_AST = null;
 		
-		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp191_AST = null;
-		tmp191_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-		astFactory.makeASTRoot(currentAST, tmp191_AST);
+		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp193_AST = null;
+		tmp193_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+		astFactory.makeASTRoot(currentAST, tmp193_AST);
 		match(LITERAL_char);
-		AST tmp192_AST = null;
-		tmp192_AST = astFactory.create(LT(1));
-		astFactory.addASTChild(currentAST, tmp192_AST);
+		AST tmp194_AST = null;
+		tmp194_AST = astFactory.create(LT(1));
+		astFactory.addASTChild(currentAST, tmp194_AST);
 		match(StringLiteral);
 		charLiteral_AST = (AST)currentAST.root;
 		returnAST = charLiteral_AST;
@@ -3484,13 +3503,13 @@ public OQLParser(ParserSharedInputState state) {
 		ASTPair currentAST = new ASTPair();
 		AST dateLiteral_AST = null;
 		
-		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp193_AST = null;
-		tmp193_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-		astFactory.makeASTRoot(currentAST, tmp193_AST);
+		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp195_AST = null;
+		tmp195_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+		astFactory.makeASTRoot(currentAST, tmp195_AST);
 		match(LITERAL_date);
-		AST tmp194_AST = null;
-		tmp194_AST = astFactory.create(LT(1));
-		astFactory.addASTChild(currentAST, tmp194_AST);
+		AST tmp196_AST = null;
+		tmp196_AST = astFactory.create(LT(1));
+		astFactory.addASTChild(currentAST, tmp196_AST);
 		match(StringLiteral);
 		dateLiteral_AST = (AST)currentAST.root;
 		returnAST = dateLiteral_AST;
@@ -3502,13 +3521,13 @@ public OQLParser(ParserSharedInputState state) {
 		ASTPair currentAST = new ASTPair();
 		AST timeLiteral_AST = null;
 		
-		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp195_AST = null;
-		tmp195_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-		astFactory.makeASTRoot(currentAST, tmp195_AST);
+		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp197_AST = null;
+		tmp197_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+		astFactory.makeASTRoot(currentAST, tmp197_AST);
 		match(LITERAL_time);
-		AST tmp196_AST = null;
-		tmp196_AST = astFactory.create(LT(1));
-		astFactory.addASTChild(currentAST, tmp196_AST);
+		AST tmp198_AST = null;
+		tmp198_AST = astFactory.create(LT(1));
+		astFactory.addASTChild(currentAST, tmp198_AST);
 		match(StringLiteral);
 		timeLiteral_AST = (AST)currentAST.root;
 		returnAST = timeLiteral_AST;
@@ -3520,13 +3539,13 @@ public OQLParser(ParserSharedInputState state) {
 		ASTPair currentAST = new ASTPair();
 		AST timestampLiteral_AST = null;
 		
-		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp197_AST = null;
-		tmp197_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
-		astFactory.makeASTRoot(currentAST, tmp197_AST);
+		com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral tmp199_AST = null;
+		tmp199_AST = (com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral)astFactory.create(LT(1),"com.gemstone.gemfire.cache.query.internal.parse.ASTLiteral");
+		astFactory.makeASTRoot(currentAST, tmp199_AST);
 		match(LITERAL_timestamp);
-		AST tmp198_AST = null;
-		tmp198_AST = astFactory.create(LT(1));
-		astFactory.addASTChild(currentAST, tmp198_AST);
+		AST tmp200_AST = null;
+		tmp200_AST = astFactory.create(LT(1));
+		astFactory.addASTChild(currentAST, tmp200_AST);
 		match(StringLiteral);
 		timestampLiteral_AST = (AST)currentAST.root;
 		returnAST = timestampLiteral_AST;
@@ -3600,6 +3619,7 @@ public OQLParser(ParserSharedInputState state) {
 		"COUNT",
 		"MAX",
 		"MIN",
+		"UDA",
 		"\"trace\"",
 		"\"import\"",
 		"\"as\"",
@@ -3690,112 +3710,112 @@ public OQLParser(ParserSharedInputState state) {
 	};
 	
 	private static final long[] mk_tokenSet_0() {
-		long[] data = { 2327943626784L, 576460615267125096L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2327943626784L, 1152921230534250192L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_0 = new BitSet(mk_tokenSet_0());
 	private static final long[] mk_tokenSet_1() {
-		long[] data = { 2327970839714L, -3236942208L, 2097151L, 0L, 0L, 0L};
+		long[] data = { 2327970839714L, -6473884416L, 4194303L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_1 = new BitSet(mk_tokenSet_1());
 	private static final long[] mk_tokenSet_2() {
-		long[] data = { 0L, 360L, 0L, 0L};
+		long[] data = { 0L, 720L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_2 = new BitSet(mk_tokenSet_2());
 	private static final long[] mk_tokenSet_3() {
-		long[] data = { 2327943626784L, 576460615267124736L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2327943626784L, 1152921230534249472L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_3 = new BitSet(mk_tokenSet_3());
 	private static final long[] mk_tokenSet_4() {
-		long[] data = { 2327939432480L, 576460615267124224L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2327939432480L, 1152921230534248448L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_4 = new BitSet(mk_tokenSet_4());
 	private static final long[] mk_tokenSet_5() {
-		long[] data = { 210L, 2326528L, 0L, 0L};
+		long[] data = { 210L, 4653056L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_5 = new BitSet(mk_tokenSet_5());
 	private static final long[] mk_tokenSet_6() {
-		long[] data = { 2327970839794L, -3235647984L, 2097151L, 0L, 0L, 0L};
+		long[] data = { 2327970839794L, -6471295968L, 4194303L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_6 = new BitSet(mk_tokenSet_6());
 	private static final long[] mk_tokenSet_7() {
-		long[] data = { 210L, 2342912L, 0L, 0L};
+		long[] data = { 210L, 4685824L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_7 = new BitSet(mk_tokenSet_7());
 	private static final long[] mk_tokenSet_8() {
-		long[] data = { 25778192594L, 2326528L, 0L, 0L};
+		long[] data = { 25778192594L, 4653056L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_8 = new BitSet(mk_tokenSet_8());
 	private static final long[] mk_tokenSet_9() {
-		long[] data = { 2327939563552L, 576460615267124224L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2327939563552L, 1152921230534248448L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_9 = new BitSet(mk_tokenSet_9());
 	private static final long[] mk_tokenSet_10() {
-		long[] data = { 2327970840160L, -3237986800L, 2097151L, 0L, 0L, 0L};
+		long[] data = { 2327970840160L, -6475973600L, 4194303L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_10 = new BitSet(mk_tokenSet_10());
 	private static final long[] mk_tokenSet_11() {
-		long[] data = { 146L, 2293760L, 0L, 0L};
+		long[] data = { 146L, 4587520L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_11 = new BitSet(mk_tokenSet_11());
 	private static final long[] mk_tokenSet_12() {
-		long[] data = { 146L, 2162688L, 0L, 0L};
+		long[] data = { 146L, 4325376L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_12 = new BitSet(mk_tokenSet_12());
 	private static final long[] mk_tokenSet_13() {
-		long[] data = { 146L, 65536L, 0L, 0L};
+		long[] data = { 146L, 131072L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_13 = new BitSet(mk_tokenSet_13());
 	private static final long[] mk_tokenSet_14() {
-		long[] data = { 2327970839650L, -3237986816L, 2097151L, 0L, 0L, 0L};
+		long[] data = { 2327970839650L, -6475973632L, 4194303L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_14 = new BitSet(mk_tokenSet_14());
 	private static final long[] mk_tokenSet_15() {
-		long[] data = { 2327970839650L, -3237986800L, 2097151L, 0L, 0L, 0L};
+		long[] data = { 2327970839650L, -6475973600L, 4194303L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_15 = new BitSet(mk_tokenSet_15());
 	private static final long[] mk_tokenSet_16() {
-		long[] data = { 2327970839600L, -3237990912L, 2097151L, 0L, 0L, 0L};
+		long[] data = { 2327970839600L, -6475981824L, 4194303L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_16 = new BitSet(mk_tokenSet_16());
 	private static final long[] mk_tokenSet_17() {
-		long[] data = { 25769803776L, -36028797018963968L, 8191L, 0L, 0L, 0L};
+		long[] data = { 25769803776L, -72057594037927936L, 16383L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_17 = new BitSet(mk_tokenSet_17());
 	private static final long[] mk_tokenSet_18() {
-		long[] data = { 2328004394994L, 576460749080886800L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2328004394994L, 1152921498161773600L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_18 = new BitSet(mk_tokenSet_18());
 	private static final long[] mk_tokenSet_19() {
-		long[] data = { 2327939432480L, 576460615132906496L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2327939432480L, 1152921230265812992L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_19 = new BitSet(mk_tokenSet_19());
 	private static final long[] mk_tokenSet_20() {
-		long[] data = { 2328004394994L, 576460749080884752L, 2089665L, 0L, 0L, 0L};
+		long[] data = { 2328004394994L, 1152921498161769504L, 4179330L, 0L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_20 = new BitSet(mk_tokenSet_20());
 	private static final long[] mk_tokenSet_21() {
-		long[] data = { 25803359186L, 4964732944L, 0L, 0L};
+		long[] data = { 25803359186L, 9929465888L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_21 = new BitSet(mk_tokenSet_21());
@@ -3805,29 +3825,34 @@ public OQLParser(ParserSharedInputState state) {
 	}
 	public static final BitSet _tokenSet_22 = new BitSet(mk_tokenSet_22());
 	private static final long[] mk_tokenSet_23() {
-		long[] data = { 53248L, 25769803776L, 0L, 0L};
+		long[] data = { 53248L, 51539607552L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_23 = new BitSet(mk_tokenSet_23());
 	private static final long[] mk_tokenSet_24() {
-		long[] data = { 196608L, 103079215104L, 0L, 0L};
+		long[] data = { 196608L, 206158430208L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_24 = new BitSet(mk_tokenSet_24());
 	private static final long[] mk_tokenSet_25() {
-		long[] data = { 25818035154L, 133813751824L, 0L, 0L};
+		long[] data = { 25818035154L, 267627503648L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_25 = new BitSet(mk_tokenSet_25());
 	private static final long[] mk_tokenSet_26() {
-		long[] data = { 49152L, 412316860416L, 0L, 0L};
+		long[] data = { 49152L, 824633720832L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_26 = new BitSet(mk_tokenSet_26());
 	private static final long[] mk_tokenSet_27() {
-		long[] data = { 25834815442L, 133813760016L, 0L, 0L};
+		long[] data = { 25834815442L, 267627520032L, 0L, 0L};
 		return data;
 	}
 	public static final BitSet _tokenSet_27 = new BitSet(mk_tokenSet_27());
+	private static final long[] mk_tokenSet_28() {
+		long[] data = { 17179869184L, 34084860463104L, 0L, 0L};
+		return data;
+	}
+	public static final BitSet _tokenSet_28 = new BitSet(mk_tokenSet_28());
 	
 	}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/oql.g
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/oql.g b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/oql.g
index caf0ec0..a9f0af3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/oql.g
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/oql.g
@@ -343,6 +343,7 @@ tokens {
     COUNT;
     MAX;
     MIN;
+    UDA;
 }
 
 queryProgram :
@@ -919,7 +920,17 @@ conversionExpr :
               TOK_LPAREN!
               stringLiteral TOK_COMMA! stringLiteral
               TOK_RPAREN! 
-           )    
+           )
+         |
+           (
+            n:Identifier^<AST=com.gemstone.gemfire.cache.query.internal.parse.ASTAggregateFunc>
+              TOK_LPAREN! tokExpr1:expr TOK_RPAREN! 
+              { 
+                ((ASTAggregateFunc)#conversionExpr).setAggregateFunctionType(UDA);
+                ((ASTAggregateFunc)#conversionExpr).setUDAName(n.getText());
+               }
+           
+           )      
 	)
     ;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionAdvisor.java
index fc280f1..b5fde24 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionAdvisor.java
@@ -1545,6 +1545,19 @@ public class DistributionAdvisor  {
       this.peerMemberId = memberId;
       this.version = version;
     }
+    
+    /**
+     * Default no op method, which needs to be overridden by the profile class which needs to be
+     * collected and sent to the caller. The overriding method should fill in the profile & add it
+     * to the replyProfiles List 
+     * @param dm 
+     * @param adviseePath
+     * @param replyProfiles
+     */
+    public void collectProfile(DistributionManager dm, String adviseePath,
+        final List<Profile> replyProfiles) {
+      // nothing by default;
+    }
 
     /**
      * Return object that uniquely identifies this profile.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index 77f24a3..ed79de7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -70,6 +70,13 @@ import com.gemstone.gemfire.cache.query.internal.StructBag;
 import com.gemstone.gemfire.cache.query.internal.StructImpl;
 import com.gemstone.gemfire.cache.query.internal.StructSet;
 import com.gemstone.gemfire.cache.query.internal.Undefined;
+import com.gemstone.gemfire.cache.query.internal.aggregate.Avg;
+import com.gemstone.gemfire.cache.query.internal.aggregate.Count;
+import com.gemstone.gemfire.cache.query.internal.aggregate.DistinctAggregator;
+import com.gemstone.gemfire.cache.query.internal.aggregate.MaxMin;
+import com.gemstone.gemfire.cache.query.internal.aggregate.Sum;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDADistributionAdvisor;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAMessage;
 import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
 import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
 import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl;
@@ -275,6 +282,7 @@ import com.gemstone.gemfire.internal.cache.TXRemoteCommitMessage.TXRemoteCommitR
 import com.gemstone.gemfire.internal.cache.TXRemoteRollbackMessage;
 import com.gemstone.gemfire.internal.cache.Token;
 import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor.CollectAttributesMessage;
 import com.gemstone.gemfire.internal.cache.UpdateEntryVersionOperation.UpdateEntryVersionMessage;
 import com.gemstone.gemfire.internal.cache.UpdateOperation;
 import com.gemstone.gemfire.internal.cache.VMCachedDeserializable;
@@ -1043,6 +1051,14 @@ public final class DSFIDFactory implements DataSerializableFixedID {
         DestroyRegionOnDataStoreMessage.class);
     registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST,
         ShutdownAllGatewayHubsRequest.class);
+    registerDSFID(AGG_FUNC_AVG, Avg.class);
+    registerDSFID(AGG_FUNC_COUNT, Count.class);
+    registerDSFID(AGG_FUNC_SUM, Sum.class);
+    registerDSFID(AGG_FUNC_DISTINCT_AGG, DistinctAggregator.class);
+    registerDSFID(AGG_FUNC_MAX_MIN, MaxMin.class);
+    registerDSFID(UDA_PROFILE, UDADistributionAdvisor.UDAProfile.class);
+    registerDSFID(UDA_MESSAGE, UDAMessage.class);
+    registerDSFID(COLLECT_ATTRIBUTES_MESSAGE, CollectAttributesMessage.class);
     registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class);
   }
 
@@ -1098,39 +1114,48 @@ public final class DSFIDFactory implements DataSerializableFixedID {
 	  case PR_DESTROY_ON_DATA_STORE_MESSAGE:
         return readDestroyOnDataStore(in);
       default:
-        final Constructor<?> cons;
-        if (dsfid >= Byte.MIN_VALUE && dsfid <= Byte.MAX_VALUE) {
-          cons = dsfidMap[dsfid + Byte.MAX_VALUE + 1];
+        Object ds = getDSFIDInstance(dsfid);
+        InternalDataSerializer.invokeFromData(ds, in);
+        return ds;      
+
+    }
+  }
+  
+  /**
+   * Returns the instance of class which is registered with the given iD
+   * 
+   * @param dsfid ID with which the DataSerializableFixedID class is registered
+   * @return Object DataSerializableFixedID instance
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public static Object getDSFIDInstance(int dsfid) throws IOException, ClassNotFoundException {
+    final Constructor<?> cons;
+    if (dsfid >= Byte.MIN_VALUE && dsfid <= Byte.MAX_VALUE) {
+      cons = dsfidMap[dsfid + Byte.MAX_VALUE + 1];
+    } else {
+      cons = (Constructor<?>) dsfidMap2.get(dsfid);
+    }
+    if (cons != null) {
+      try {
+        return cons.newInstance((Object[]) null);
+      } catch (InstantiationException ie) {
+        throw new IOException(ie.getMessage(), ie);
+      } catch (IllegalAccessException iae) {
+        throw new IOException(iae.getMessage(), iae);
+      } catch (InvocationTargetException ite) {
+        Throwable targetEx = ite.getTargetException();
+        if (targetEx instanceof IOException) {
+          throw (IOException) targetEx;
+        } else if (targetEx instanceof ClassNotFoundException) {
+          throw (ClassNotFoundException) targetEx;
         } else {
-          cons = (Constructor<?>) dsfidMap2.get(dsfid);
-        }
-        if (cons != null) {
-          try {
-            Object ds = cons
-                    .newInstance((Object[]) null);
-            InternalDataSerializer.invokeFromData(ds, in);
-            return ds;
-          } catch (InstantiationException ie) {
-            throw new IOException(ie.getMessage(), ie);
-          } catch (IllegalAccessException iae) {
-            throw new IOException(iae.getMessage(), iae);
-          } catch (InvocationTargetException ite) {
-            Throwable targetEx = ite.getTargetException();
-            if (targetEx instanceof IOException) {
-              throw (IOException) targetEx;
-            } else if (targetEx instanceof ClassNotFoundException) {
-              throw (ClassNotFoundException) targetEx;
-            } else {
-              throw new IOException(ite.getMessage(), targetEx);
-            }
-          }
+          throw new IOException(ite.getMessage(), targetEx);
         }
-        throw new DSFIDNotFoundException("Unknown DataSerializableFixedID: "
-                + dsfid, dsfid);
-
+      }
     }
+    throw new DSFIDNotFoundException("Unknown DataSerializableFixedID: " + dsfid, dsfid);
   }
-
  
   //////////////////  Reading Internal Objects  /////////////////
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 5d52346..ccb4dc6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -646,7 +646,15 @@ public interface DataSerializableFixedID extends SerializationVersions {
   public static final short CUMULATIVE_RESULTS = 168;
   public static final short DISTTX_ROLLBACK_MESSAGE = 169;
   public static final short DISTTX_ROLLBACK_REPLY_MESSAGE = 170;
-  // 171..999 unused
+  public static final short AGG_FUNC_AVG = 171;
+  public static final short AGG_FUNC_COUNT = 172;
+  public static final short AGG_FUNC_SUM = 173;
+  public static final short AGG_FUNC_DISTINCT_AGG = 174;
+  public static final short AGG_FUNC_MAX_MIN = 175;
+  public static final short UDA_PROFILE = 176;
+  public static final short UDA_MESSAGE = 177;
+  public static final short COLLECT_ATTRIBUTES_MESSAGE = 178;
+  // 179..999 unused
 
   public static final short ADD_HEALTH_LISTENER_REQUEST = 1000;
   public static final short ADD_HEALTH_LISTENER_RESPONSE = 1001;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index cc9727b..1b9bf81 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -140,6 +140,8 @@ import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
 import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManagerImpl;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
 import com.gemstone.gemfire.cache.server.CacheServer;
@@ -491,6 +493,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * lock used to access prLockService
    */
   private final Object prLockServiceLock = new Object();
+  private final UDAManagerImpl udaMgr;
   
   /**
    * DistributedLockService for GatewaySenders. Remains null until the
@@ -859,7 +862,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
               .toLocalizedString());
         }
       }
-
+      this.udaMgr = new UDAManagerImpl();
       this.rootRegions = new HashMap();
       
       this.cqService = CqServiceProvider.create(this);
@@ -1196,6 +1199,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     
     startRestAgentServer(this);
     
+    udaMgr.collectUDAsFromRemote();
     int time = Integer.getInteger("gemfire.CLIENT_FUNCTION_TIMEOUT",
         DEFAULT_CLIENT_FUNCTION_TIMEOUT);
     clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
@@ -1975,6 +1979,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   public PersistentMemberManager getPersistentMemberManager() {
     return persistentMemberManager;
   }
+  
+  public UDAManagerImpl getUDAManager() {
+    return this.udaMgr;
+  }
 
   public ClientMetadataService getClientMetadataService() {
     synchronized (this.clientMetaDatServiceLock) {
@@ -2181,7 +2189,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
             }
             destroyPartitionedRegionLockService();
           }
-
+          udaMgr.clear();
           closeDiskStores();
           diskMonitor.close();
           

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
index 36eee80..06023b1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
 import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.internal.cache.extension.Extensible;
@@ -47,5 +48,7 @@ public interface InternalCache extends Cache, Extensible<Cache> {
   
   public Collection<HDFSStoreImpl> getHDFSStores() ;
   
+  public UDAManager getUDAManager() ;  
+  
   public <T extends CacheService> T getService(Class<T> clazz);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
index 85f058f..b45e5ff 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -52,6 +52,7 @@ import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
 import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
+import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults.Metadata;
 import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver.IndexInfo;
 import com.gemstone.gemfire.cache.query.internal.NWayMergeResults;
 import com.gemstone.gemfire.cache.query.internal.OrderByComparator;
@@ -785,12 +786,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   /**
    * Applies order-by on the results returned from PR nodes and puts the results in 
    * the cumulative result set.
-   * The order-by is applied by running a generated query on the each result returned
-   * by the remote nodes.
-   * Example generated query: SELECT DISTINCT * FROM $1 p ORDER BY p.ID
-   * Where results are passed as bind parameter.
-   * This is added as quick turn-around, this is added based on most commonly used
-   * queries, needs to be investigated further.
+   *
    */   
   private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws QueryException {
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateAttributesProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateAttributesProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateAttributesProcessor.java
index 866eaff..71c58b6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateAttributesProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateAttributesProcessor.java
@@ -165,6 +165,50 @@ public class UpdateAttributesProcessor {
     this.processor = processor;
   }
 
+  /**
+   * Collects the profile from the remote peers.
+   * The profile to be collected is  defined by the ID contained in DataSerializableFixedID.getDSFID 
+   * for that profile. This method is different in the sense, it does not send this member's profile
+   * to the peers.
+   * Note: The dsfID should be that of a Profile type class .
+   * 
+   * @param dsfID ID contained in DataSerializableFixedID.getDSFID 
+   * for that profile
+   */
+  public void collect(int dsfID) {
+    DM mgr = this.advisee.getDistributionManager();
+    DistributionAdvisor advisor = this.advisee.getDistributionAdvisor();
+
+    final Set recipients = advisor.adviseProfileExchange();
+    if (recipients.isEmpty()) {
+      return;
+    }
+
+    ReplyProcessor21 processor = null;
+    // Scope scope = this.region.scope;
+
+    // always require an ack to prevent misordering of messages
+    InternalDistributedSystem system = this.advisee.getSystem();
+    processor = new UpdateAttributesReplyProcessor(system, recipients);
+    CollectAttributesMessage message = getCollectAttributesMessage(processor, recipients, dsfID);
+    mgr.putOutgoing(message);
+    this.processor = processor;
+    waitForProfileResponse();
+  }
+
+  CollectAttributesMessage getCollectAttributesMessage(ReplyProcessor21 processor, Set recipients, int dsfID) {
+
+    CollectAttributesMessage msg = new CollectAttributesMessage();
+    msg.dsfID = dsfID;
+
+    msg.adviseePath = this.advisee.getFullPath();
+    msg.setRecipients(recipients);
+    if (processor != null) {
+      msg.processorId = processor.getProcessorId();
+    }
+
+    return msg;
+  }
 
   UpdateAttributesMessage getUpdateAttributesMessage(ReplyProcessor21 processor,
                                                      Set recipients) {
@@ -549,4 +593,106 @@ public class UpdateAttributesProcessor {
       return true;
     }
   }
+  
+  public static final class CollectAttributesMessage extends HighPriorityDistributionMessage 
+  implements MessageWithReply {
+    protected int dsfID;
+
+    protected String adviseePath;
+    protected int processorId = 0;
+
+    @Override
+    public int getProcessorId() {
+      return this.processorId;
+    }
+
+    @Override
+    protected void process(DistributionManager dm) {
+      Throwable thr = null;
+
+      boolean sendReply = this.processorId != 0;
+      List<Profile> replyProfiles = null;
+      try {
+        // create a dummy profile object
+        Profile dummyProfile = (Profile) DSFIDFactory.getDSFIDInstance(this.dsfID);
+        replyProfiles = new ArrayList<Profile>();
+        dummyProfile.collectProfile(dm, this.adviseePath, replyProfiles);
+      } catch (CancelException e) {
+
+      } catch (Throwable t) {
+        Error err;
+        if (t instanceof Error && SystemFailure.isJVMFailureError(err = (Error) t)) {
+          SystemFailure.initiateFailure(err);
+          // If this ever returns, rethrow the error. We're poisoned
+          // now, so don't let this thread continue.
+          throw err;
+        }
+        // Whenever you catch Error or Throwable, you must also
+        // check for fatal JVM error (see above). However, there is
+        // _still_ a possibility that you are dealing with a cascading
+        // error condition, so you also need to check to see if the JVM
+        // is still usable:
+        SystemFailure.checkFailure();
+        thr = t;
+      } finally {
+        if (sendReply) {
+          ReplyException rex = null;
+          if (thr != null) {
+            rex = new ReplyException(thr);
+          }
+          if (replyProfiles == null || replyProfiles.size() <= 1) {
+            Profile p = null;
+            if (replyProfiles != null && replyProfiles.size() == 1) {
+              p = replyProfiles.get(0);
+            }
+            ProfileReplyMessage.send(getSender(), this.processorId, rex, dm, p);
+          } else {
+            Profile[] profiles = new Profile[replyProfiles.size()];
+            replyProfiles.toArray(profiles);
+            ProfilesReplyMessage.send(getSender(), this.processorId, rex, dm, profiles);
+          }
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buff = new StringBuilder();
+      buff.append("CollectAttributesMessage (adviseePath=");
+      buff.append(this.adviseePath);
+      buff.append("; processorId=");
+      buff.append(this.processorId);
+      buff.append("; dsfid=");
+      buff.append(this.dsfID);
+
+      buff.append(")");
+      return buff.toString();
+    }
+
+    public int getDSFID() {
+      return COLLECT_ATTRIBUTES_MESSAGE;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      this.adviseePath = DataSerializer.readString(in);
+      this.processorId = in.readInt();
+      this.dsfID = in.readInt();
+
+      // set the processor ID to be able to send reply to sender in case of any
+      // unexpected exception during deserialization etc.
+      ReplyProcessor21.setMessageRPId(this.processorId);
+
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeString(this.adviseePath, out);
+      out.writeInt(this.processorId);
+      out.writeInt(this.dsfID);
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
index 915bde9..a8d020d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
@@ -74,10 +74,14 @@ import com.gemstone.gemfire.cache.query.IndexInvalidException;
 import com.gemstone.gemfire.cache.query.IndexNameConflictException;
 import com.gemstone.gemfire.cache.query.IndexType;
 import com.gemstone.gemfire.cache.query.MultiIndexCreationException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
 import com.gemstone.gemfire.cache.query.Query;
 import com.gemstone.gemfire.cache.query.QueryInvalidException;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManagerImpl;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.snapshot.CacheSnapshotService;
@@ -199,7 +203,8 @@ public class CacheCreation implements InternalCache {
    */
   protected final Map diskStores = new LinkedHashMap();
   protected final Map hdfsStores = new LinkedHashMap();
-  
+  private final UDAManagerCreation udaMgrCreation = new UDAManagerCreation();
+
   private final List<File> backups = new ArrayList<File>();
 
   private CacheConfig cacheConfig = new CacheConfig();
@@ -311,6 +316,10 @@ public class CacheCreation implements InternalCache {
     return this.searchTimeout;
   }
 
+  public void addUDA(String udaName, String udaClass) {
+    this.udaMgrCreation.createUDA(udaName, udaClass);
+  }
+
   public void setSearchTimeout(int seconds) {
     this.searchTimeout = seconds;
     this.hasSearchTimeout = true;
@@ -459,7 +468,18 @@ public class CacheCreation implements InternalCache {
         cache.getCacheTransactionManager()!=null) {
       cache.getCacheTransactionManager().setWriter(this.txMgrCreation.getWriter());
     }
-    
+
+    UDAManagerImpl udaMgrImpl = cache.getUDAManager();
+    for (Map.Entry<String, String> entry : this.udaMgrCreation.getUDAs().entrySet()) {
+      try {
+        udaMgrImpl.createUDA(entry.getKey(), entry.getValue());
+      } catch (UDAExistsException udae) {
+        throw new RuntimeException(LocalizedStrings.UDA_MANAGER_Uda_Exists.toLocalizedString(entry.getKey()));
+      } catch (NameResolutionException nre) {
+        throw new RuntimeException(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(entry.getValue(), entry.getKey()));
+      }
+
+    }
     for (GatewaySender senderCreation : this.getGatewaySenders()) {
       GatewaySenderFactory factory = (GatewaySenderFactory)cache
           .createGatewaySenderFactory();
@@ -1684,10 +1704,25 @@ public class CacheCreation implements InternalCache {
     public boolean clearDefinedIndexes() {
       throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
     }
-    
+
+    @Override
+    public void createUDA(String udaName, String udaClass) throws UDAExistsException, NameResolutionException {
+      getUDAManager().createUDA(udaName, udaClass);
+
+    }
+
+    @Override
+    public void removeUDA(String udaName) {
+      getUDAManager().removeUDA(udaName);
+    }
+
   };
 
   @Override
+  public UDAManager getUDAManager() {
+    return this.udaMgrCreation;
+  }
+  
   public <T extends CacheService> T getService(Class<T> clazz) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
index c6b0509..66091d2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
@@ -625,6 +625,11 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
 
   /** Name of region property specifying the cloning **/
   public static final String CLONING_ENABLED = "cloning-enabled";
+  /** The name of the <code>uda</code> element */
+  public static final String UDA_MANAGER = "uda-manager";
+  public static final String UDA = "uda";
+  public static final String UDA_NAME = "name";
+  public static final String UDA_CLASS = "class";
 
   // begin constants for connection pool
   public static final String CONNECTION_POOL = "pool";

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index ea3c975..d2ac140 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -58,6 +58,7 @@ import org.xml.sax.XMLReader;
 import org.xml.sax.ext.LexicalHandler;
 import org.xml.sax.helpers.AttributesImpl;
 
+import com.gemstone.gemfire.internal.cache.InternalCache;
 import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
@@ -99,7 +100,9 @@ import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionService;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
+import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.Index;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
 import com.gemstone.gemfire.cache.query.internal.index.HashIndex;
 import com.gemstone.gemfire.cache.query.internal.index.PrimaryKeyIndex;
 import com.gemstone.gemfire.cache.server.CacheServer;
@@ -535,7 +538,11 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
       } else if(this.version.compareTo(CacheXmlVersion.GEMFIRE_6_6) >= 0) {
         generate(this.cache.getCacheTransactionManager());
       }
-
+     
+      if (this.version.compareTo(CacheXmlVersion.GEODE_1_0) >= 0) {
+        generateUDA(this.cache);
+      }
+     
       generateDynamicRegionFactory(this.cache);
 
       if (!isClientCache) {
@@ -1391,6 +1398,23 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
     }
     handler.endElement("", DYNAMIC_REGION_FACTORY, DYNAMIC_REGION_FACTORY);
   }
+  
+  private void generateUDA(Cache c) throws SAXException {
+    UDAManager udaMgr = ((InternalCache)c).getUDAManager();
+    Map<String, String> map = udaMgr.getUDAs();
+    if (map.isEmpty()) {
+      return;
+    }
+    handler.startElement("", UDA_MANAGER, UDA_MANAGER, EMPTY);
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      AttributesImpl atts = new AttributesImpl();
+      atts.addAttribute("", "", UDA_NAME, "", entry.getKey());
+      atts.addAttribute("", "", UDA_CLASS, "", entry.getValue());
+      handler.startElement("", UDA, UDA, atts);
+      handler.endElement("", UDA, UDA);
+    }
+    handler.endElement("", UDA_MANAGER, UDA_MANAGER);
+  }
 
   private void generateGatewaySender(GatewaySender sender) throws SAXException {
       AttributesImpl atts = new AttributesImpl();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
index e6c0b60..256f5ef 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
@@ -90,6 +90,9 @@ import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.query.IndexType;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
 import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
@@ -1967,6 +1970,23 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     this.stack.push(icd);
   }
   
+  private void startUDA(Attributes atts) {
+    String name = atts.getValue(UDA_NAME);
+    String clazz = atts.getValue(UDA_CLASS);
+    Map<String, String> map = (Map<String, String>)this.stack.peek();
+    map.put(name, clazz);
+  }
+  private void startUDAManager() {
+    this.stack.push(new HashMap<String, String>());
+  }
+  
+  private void endUDAManager() {
+    Map<String, String> map = (Map<String, String>)this.stack.pop();
+    for(Map.Entry<String, String> entry : map.entrySet()) {
+      this.cache.addUDA(entry.getKey(), entry.getValue());
+    }
+  }
+  
   /**
    * When index element is ending we need to verify all attributes because of
    * new index tag definition since 6.6.1 and support previous definition also. 
@@ -2931,6 +2951,11 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       //push it in stack
       startIndex(atts);
       //this.stack.push(new IndexCreationData(atts.getValue(NAME)));
+    }else if (qName.equals(UDA_MANAGER)) {     
+      startUDAManager();      
+    }
+    else if (qName.equals(UDA)) {     
+      startUDA(atts);      
     }
     else if (qName.equals(FUNCTIONAL)) {
       startFunctionalIndex(atts);
@@ -3300,6 +3325,10 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       else if (qName.equals(INDEX)) {
         endIndex();
       }
+      else if (qName.equals(UDA_MANAGER)) {
+        endUDAManager();
+      }else if (qName.equals(UDA)) {        
+      }
       else if (qName.equals(PRIMARY_KEY)) {
       }
       else if (qName.equals(TRANSACTION_MANAGER)) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
index 8285a65..7d57b8a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
@@ -2359,11 +2359,11 @@ class ParentLocalizedStrings {
   public static final StringId TransactionImpl_TRANSACTIONIMPLREGISTERSYNCHRONIZATIONSYNCHRONIZATION_IS_NULL = new StringId(3362, "TransactionImpl::registerSynchronization:Synchronization is null");
   public static final StringId TransactionManagerImpl_NO_TRANSACTION_PRESENT = new StringId(3363, "no transaction present");
   public static final StringId TransactionManagerImpl_TRANSACTIONMANAGER_INVALID = new StringId(3364, "TransactionManager invalid");
-  // okay to reuse 3365
-  // okay to reuse 3366
-  // okay to reuse 3367
-  // okay to reuse 3368
-  // okay to reuse 3369
+  public static final StringId UDA_MANAGER_Class_Not_Found = new StringId(3365, "The class {0} for UDA {1} could not be found");
+  public static final StringId UDA_MANAGER_Uda_Exists = new StringId(3366, "UDA with name {0} already exists");
+  public static final StringId UDA_MANAGER_Class_Conflict = new StringId(3367, "UDA with name {0} is already registered with UDA class {1} which conflicts with the UDA class {2}");
+  public static final StringId UDA_MANAGER_Aggregator_Not_Found= new StringId(3368, "No UDA registered with name {0}");
+  public static final StringId UDA_MANAGER_Udas_Not_Collected= new StringId(3369, "UDAs defined in the system could not be collected");  
   // okay to reuse 3370
   // okay to reuse 3371
   // okay to reuse 3372

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
----------------------------------------------------------------------
diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index 5ecd67d..26a18de 100755
--- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -77,6 +77,18 @@ declarative caching XML file elements unless indicated otherwise.
       <xsd:sequence>
         <xsd:element maxOccurs="1" minOccurs="0" name="cache-transaction-manager" type="gf:cache-transaction-manager-type" />
         <xsd:element maxOccurs="1" minOccurs="0" name="dynamic-region-factory" type="gf:dynamic-region-factory-type" />
+        <xsd:element maxOccurs="1" minOccurs="0" name="uda-manager">
+          <xsd:complexType>
+           <xsd:sequence>
+            <xsd:element maxOccurs="unbounded" minOccurs="0" name="uda">
+              <xsd:complexType>
+                <xsd:attribute name="name" type="xsd:string" use="required" />
+                <xsd:attribute name="class" type="xsd:string" use="required" />
+              </xsd:complexType>
+            </xsd:element>
+          </xsd:sequence>
+         </xsd:complexType>
+        </xsd:element>  
         <xsd:element maxOccurs="unbounded" minOccurs="0" name="gateway-hub">
           <xsd:annotation>
             <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/GroupByPartitionedQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/GroupByPartitionedQueryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/GroupByPartitionedQueryDUnitTest.java
index b005463..d3afe36 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/GroupByPartitionedQueryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/GroupByPartitionedQueryDUnitTest.java
@@ -22,12 +22,6 @@ import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.query.Index;
-import com.gemstone.gemfire.cache.query.IndexExistsException;
-import com.gemstone.gemfire.cache.query.IndexInvalidException;
-import com.gemstone.gemfire.cache.query.IndexNameConflictException;
-import com.gemstone.gemfire.cache.query.IndexType;
-import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.SerializableRunnable;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunctionJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunctionJUnitTest.java
index fc793bf..a495684 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunctionJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunctionJUnitTest.java
@@ -29,19 +29,13 @@ import org.junit.experimental.categories.Category;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.internal.aggregate.Avg;
-import com.gemstone.gemfire.cache.query.internal.aggregate.AvgBucketNode;
 import com.gemstone.gemfire.cache.query.internal.aggregate.AvgDistinct;
-import com.gemstone.gemfire.cache.query.internal.aggregate.AvgDistinctPRQueryNode;
-import com.gemstone.gemfire.cache.query.internal.aggregate.AvgPRQueryNode;
 import com.gemstone.gemfire.cache.query.internal.aggregate.Count;
 import com.gemstone.gemfire.cache.query.internal.aggregate.CountDistinct;
-import com.gemstone.gemfire.cache.query.internal.aggregate.CountDistinctPRQueryNode;
-import com.gemstone.gemfire.cache.query.internal.aggregate.CountPRQueryNode;
 import com.gemstone.gemfire.cache.query.internal.aggregate.DistinctAggregator;
 import com.gemstone.gemfire.cache.query.internal.aggregate.MaxMin;
 import com.gemstone.gemfire.cache.query.internal.aggregate.Sum;
 import com.gemstone.gemfire.cache.query.internal.aggregate.SumDistinct;
-import com.gemstone.gemfire.cache.query.internal.aggregate.SumDistinctPRQueryNode;
 import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
@@ -73,12 +67,7 @@ public class CompiledAggregateFunctionJUnitTest extends TestCase {
     ExecutionContext context2 = new ExecutionContext(null,cache);
     assertTrue(caf2.evaluate(context2) instanceof CountDistinct);
 
-    CompiledAggregateFunction caf3 = new CompiledAggregateFunction(null,
-        OQLLexerTokenTypes.COUNT);
-    ExecutionContext context3 = new ExecutionContext(null,cache);
-    context3.setIsPRQueryNode(true);
-    assertTrue(caf3.evaluate(context3) instanceof CountPRQueryNode);
-
+   
     CompiledAggregateFunction caf4 = new CompiledAggregateFunction(null,
         OQLLexerTokenTypes.COUNT);
     QueryExecutionContext context4 = new QueryExecutionContext(null,cache);
@@ -86,12 +75,7 @@ public class CompiledAggregateFunctionJUnitTest extends TestCase {
     context4.setBucketList(bucketList);
     assertTrue(caf4.evaluate(context4) instanceof Count);
 
-    CompiledAggregateFunction caf5 = new CompiledAggregateFunction(null,
-        OQLLexerTokenTypes.COUNT, true);
-    ExecutionContext context5 = new ExecutionContext(null,cache);
-    context5.setIsPRQueryNode(true);
-    assertTrue(caf5.evaluate(context5) instanceof CountDistinctPRQueryNode);
-
+   
     CompiledAggregateFunction caf6 = new CompiledAggregateFunction(null,
         OQLLexerTokenTypes.COUNT, true);
     QueryExecutionContext context6 = new QueryExecutionContext(null,cache);
@@ -123,12 +107,7 @@ public class CompiledAggregateFunctionJUnitTest extends TestCase {
     context4.setBucketList(bucketList);
     assertTrue(caf4.evaluate(context4) instanceof Sum);
 
-    CompiledAggregateFunction caf5 = new CompiledAggregateFunction(null,
-        OQLLexerTokenTypes.SUM, true);
-    ExecutionContext context5 = new ExecutionContext(null,cache);
-    context5.setIsPRQueryNode(true);
-    assertTrue(caf5.evaluate(context5) instanceof SumDistinctPRQueryNode);
-
+ 
     CompiledAggregateFunction caf6 = new CompiledAggregateFunction(null,
         OQLLexerTokenTypes.SUM, true);
     QueryExecutionContext context6 = new QueryExecutionContext(null,cache);
@@ -148,24 +127,7 @@ public class CompiledAggregateFunctionJUnitTest extends TestCase {
     ExecutionContext context2 = new ExecutionContext(null,cache);
     assertTrue(caf2.evaluate(context2) instanceof AvgDistinct);
 
-    CompiledAggregateFunction caf3 = new CompiledAggregateFunction(null,
-        OQLLexerTokenTypes.AVG);
-    ExecutionContext context3 = new ExecutionContext(null,cache);
-    context3.setIsPRQueryNode(true);
-    assertTrue(caf3.evaluate(context3) instanceof AvgPRQueryNode);
-
-    CompiledAggregateFunction caf4 = new CompiledAggregateFunction(null,
-        OQLLexerTokenTypes.AVG);
-    QueryExecutionContext context4 = new QueryExecutionContext(null,cache);
-    context4.setBucketList(this.bucketList);
-    assertTrue(caf4.evaluate(context4) instanceof AvgBucketNode);
-
-    CompiledAggregateFunction caf5 = new CompiledAggregateFunction(null,
-        OQLLexerTokenTypes.AVG, true);
-    ExecutionContext context5 = new ExecutionContext(null,cache);
-    context5.setIsPRQueryNode(true);
-    assertTrue(caf5.evaluate(context5) instanceof AvgDistinctPRQueryNode);
-
+   
     CompiledAggregateFunction caf6 = new CompiledAggregateFunction(null,
         OQLLexerTokenTypes.AVG, true);
     QueryExecutionContext context6 = new QueryExecutionContext(null,cache);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/aggregate/AggregatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/aggregate/AggregatorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/aggregate/AggregatorJUnitTest.java
index 9645b4e..e7eba61 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/aggregate/AggregatorJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/internal/aggregate/AggregatorJUnitTest.java
@@ -16,8 +16,9 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.math.BigDecimal;
+
+import junit.framework.TestCase;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -41,9 +42,20 @@ public class AggregatorJUnitTest extends TestCase{
     count.accumulate(null);
     assertEquals(2, ((Number)count.terminate()).intValue());
     
-    CountPRQueryNode countPrQ = new CountPRQueryNode();
-    countPrQ.accumulate(new Integer(5));
-    countPrQ.accumulate(new Integer(6));    
+    Count  countPrQ = new Count();
+    Count count1 = new Count();
+    for(int i = 1; i <=5; ++i) {
+      count1.accumulate(100);
+    }
+    
+    Count count2 = new Count();
+    for(int i = 1; i <=6; ++i) {
+      count2.accumulate(100);
+    }
+    
+    countPrQ.merge(count1);
+    countPrQ.merge(count2);
+       
     assertEquals(11, ((Number)countPrQ.terminate()).intValue());    
   }
   
@@ -58,20 +70,20 @@ public class AggregatorJUnitTest extends TestCase{
     count.accumulate(null);
     assertEquals(2, ((Number)count.terminate()).intValue());
     
-    CountDistinctPRQueryNode cdpr = new CountDistinctPRQueryNode();
+    CountDistinct cdpr = new CountDistinct();
     
-    Set<Integer> set1 = new HashSet<Integer>();
-    set1.add(1);
-    set1.add(2);
-    set1.add(3);
+    CountDistinct cd1  = new CountDistinct();
+    cd1.accumulate(1);
+    cd1.accumulate(2);
+    cd1.accumulate(3);
     
-    Set<Integer> set2 = new HashSet<Integer>();
-    set2.add(3);
-    set2.add(4);
-    set2.add(5);
+    CountDistinct cd2  = new CountDistinct();
+    cd1.accumulate(3);
+    cd1.accumulate(4);
+    cd1.accumulate(5);
     
-    cdpr.accumulate(set1);
-    cdpr.accumulate(set2);
+    cdpr.merge(cd1);
+    cdpr.merge(cd2);
     assertEquals(5, ((Number)cdpr.terminate()).intValue());    
   }
   
@@ -81,7 +93,25 @@ public class AggregatorJUnitTest extends TestCase{
     sum.accumulate(new Integer(5));
     sum.accumulate(new Integer(6));
     sum.accumulate(null);
-    assertEquals(11, ((Number)sum.terminate()).intValue());      
+    assertEquals(11, ((Number)sum.terminate()).intValue());  
+    
+    Sum sumPR = new Sum();
+    
+    Sum sum1 = new Sum();
+    sum1.accumulate(new Integer(5));
+    sum1.accumulate(new Integer(6));
+    sum1.accumulate(null);
+    
+    Sum sum2 = new Sum();
+    sum2.accumulate(new Integer(7));
+    sum2.accumulate(new Integer(8));
+    
+    
+    sumPR.merge(sum1);
+    sumPR.merge(sum2);
+    
+    assertEquals(26, ((Number)sumPR.terminate()).intValue());  
+    
   }
   
   @Test
@@ -94,20 +124,22 @@ public class AggregatorJUnitTest extends TestCase{
     sum.accumulate(new Integer(6));
     assertEquals(11, ((Number)sum.terminate()).intValue());
     
-    SumDistinctPRQueryNode sdpr = new SumDistinctPRQueryNode();
+    SumDistinct sdpr = new SumDistinct();
+   
+    SumDistinct sd1 = new SumDistinct();   
+    sd1.accumulate(5);
+    sd1.accumulate(6);
+    sd1.accumulate(3);
+    
+    SumDistinct sd2 = new SumDistinct();    
+    sd1.accumulate(3);
+    sd1.accumulate(7);
+    sd1.accumulate(8);
+   
+    
+    sdpr.merge(sd1);
+    sdpr.merge(sd2);
    
-    Set<Integer> set1 = new HashSet<Integer>();
-    set1.add(5);
-    set1.add(6);
-    set1.add(3);
-    
-    Set<Integer> set2 = new HashSet<Integer>();
-    set2.add(3);
-    set2.add(7);
-    set2.add(8);
-    
-    sdpr.accumulate(set1);
-    sdpr.accumulate(set2);
     assertEquals(29, ((Number)sdpr.terminate()).intValue());
   }
   
@@ -128,29 +160,27 @@ public class AggregatorJUnitTest extends TestCase{
     assertEquals(expected, ((Number)avg.terminate()).floatValue());
     
    
-    AvgBucketNode abn = new AvgBucketNode();
-    abn.accumulate(new Integer(1));
-    abn.accumulate(new Integer(2));
-    abn.accumulate(new Integer(3));
-    abn.accumulate(new Integer(4));
-    abn.accumulate(new Integer(5));
-    abn.accumulate(new Integer(6));
-    abn.accumulate(new Integer(7));
-    abn.accumulate(new Integer(7));
-    abn.accumulate(null);
-    abn.accumulate(null); 
-    Object[] arr = (Object[]) abn.terminate();
-    assertEquals(8, ((Integer)arr[0]).intValue());
-    assertEquals(35, ((Number)arr[1]).intValue());
-    
-    
-    AvgPRQueryNode apqn = new AvgPRQueryNode();
-    Object[] val1 = new Object[]{new Integer(7), new Double(43)};
-    Object[] val2 = new Object[]{new Integer(5), new Double(273.86)};
-    apqn.accumulate(val1);
-    apqn.accumulate(val2);
-    expected = (43+273.86f)/12.0f ;
-    assertEquals(expected, ((Number)apqn.terminate()).floatValue());    
+    Avg avgQ = new Avg();
+    
+    Avg abn1 = new Avg();
+    abn1.accumulate(new Integer(1));
+    abn1.accumulate(new Integer(2));
+    abn1.accumulate(new Integer(3));
+    abn1.accumulate(new Integer(4));
+    
+    Avg abn2 = new Avg();
+    abn2.accumulate(new Integer(5));
+    abn2.accumulate(new Integer(6));
+    abn2.accumulate(new Integer(7));
+    abn2.accumulate(new Integer(8));
+    abn2.accumulate(null);
+    abn2.accumulate(null); 
+    
+    avgQ.merge(abn1);
+    avgQ.merge(abn2);
+    
+    expected = (1+2+3 +4 +5 +6 + 7 + 8)/8.0f ;
+    assertEquals(expected, ((Number)avgQ.terminate()).floatValue());    
   }
   
   @Test
@@ -173,22 +203,22 @@ public class AggregatorJUnitTest extends TestCase{
     assertEquals(expected, ((Number)avg.terminate()).floatValue());
     
    
-    AvgDistinctPRQueryNode adpqn = new AvgDistinctPRQueryNode();
-    
-    Set<Integer> set1 = new HashSet<Integer>();
-    set1.add(5);
-    set1.add(6);
-    set1.add(3);
-    set1.add(4);
-    
-    Set<Integer> set2 = new HashSet<Integer>();
-    set2.add(3);
-    set2.add(7);
-    set2.add(8);
-    set2.add(4);
-    
-    adpqn.accumulate(set1);
-    adpqn.accumulate(set2);
+    AvgDistinct adpqn = new AvgDistinct();
+    
+    AvgDistinct ad1 = new AvgDistinct();
+    ad1.accumulate(5);
+    ad1.accumulate(6);
+    ad1.accumulate(3);
+    ad1.accumulate(4);
+    
+    AvgDistinct ad2 = new AvgDistinct();
+    ad2.accumulate(3);
+    ad2.accumulate(7);
+    ad2.accumulate(8);
+    ad2.accumulate(4);
+    
+    adpqn.merge(ad1);
+    adpqn.merge(ad2);
    
     expected = (3+4+5+6+7+8)/6.0f ;
     assertEquals(expected, ((Number)adpqn.terminate()).floatValue());    
@@ -210,5 +240,30 @@ public class AggregatorJUnitTest extends TestCase{
     min.accumulate(null);
     assertEquals(1,((Integer)min.terminate()).intValue());
   }
+  
+  @Test 
+  public void testDowncast() throws Exception {
+   new BigDecimal(1d).longValueExact();
+    Sum sum = new Sum();
+    sum.accumulate(new Integer(Integer.MAX_VALUE));
+    sum.accumulate(new Integer(6));
+    Number result = (Number)sum.terminate();
+    assertTrue( result instanceof Long);
+    assertEquals(Integer.MAX_VALUE + 6l, result.longValue());
+    
+    sum = new Sum();
+    sum.accumulate(new Long(5));
+    sum.accumulate(new Integer(6));
+    result = (Number)sum.terminate();
+    assertTrue( result instanceof Integer);
+    assertEquals(11, result.intValue());
+    
+    sum = new Sum();
+    sum.accumulate(Float.MAX_VALUE);
+    result = (Number)sum.terminate();
+    assertTrue( result instanceof Double);
+    assertEquals(Float.MAX_VALUE, result.floatValue());
+    
+  }
  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java
index 57e3a13..f3959e4 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java
@@ -22,7 +22,10 @@ package com.gemstone.gemfire.cache30;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
@@ -30,6 +33,7 @@ import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
 import com.gemstone.gemfire.internal.cache.xmlcache.ResourceManagerCreation;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.test.dunit.IgnoredException;
+import java.util.Map;
 
 
 public class CacheXmlGeode10DUnitTest extends CacheXml81DUnitTest {
@@ -231,4 +235,74 @@ public class CacheXmlGeode10DUnitTest extends CacheXml81DUnitTest {
       System.clearProperty("gemfire."+DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
     }
   }
+  
+  public void testUDA() {
+    final CacheCreation cache = new CacheCreation();
+    cache.addUDA("uda1", UDACLass1.class.getName());
+    cache.addUDA("uda2", UDACLass2.class.getName());
+    cache.addUDA("uda3", UDACLass3.class.getName());
+    testXml(cache);
+
+    final Cache c = getCache();
+    assertNotNull(c);
+    UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+    Map<String, String> map = udaMgr.getUDAs();
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+    assertEquals(map.get("uda2"), UDACLass2.class.getName());
+    assertEquals(map.get("uda3"), UDACLass3.class.getName());
+  }
+  
+  public class UDACLass1 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {}
+
+    @Override
+    public void init() {}
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {}
+    
+  }
+  
+  public class UDACLass2 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {}
+
+    @Override
+    public void init() {}
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {}
+    
+  }
+  
+  public class UDACLass3 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {}
+
+    @Override
+    public void init() {}
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {}
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlTestCase.java
index 8c9776e..dd09247 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlTestCase.java
@@ -38,7 +38,7 @@ public class CacheXmlTestCase extends CacheTestCase {
   private File xmlFile;
   
   /** set this to false if a test needs a non-loner distributed system */
-  static boolean lonerDistributedSystem = true;
+  protected static boolean lonerDistributedSystem = true;
 
   public CacheXmlTestCase(String name) {
     super(name);



[5/6] incubator-geode git commit: classes for implementing UDA functionality

Posted by up...@apache.org.
classes for implementing UDA functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/42fb6fc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/42fb6fc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/42fb6fc0

Branch: refs/heads/feature/GEODE-1269
Commit: 42fb6fc0cff310c976f7ab1e5621c59ad4aabf01
Parents: 8bc7afa
Author: Asif Shahid <as...@snappydata.io>
Authored: Thu Apr 14 22:12:25 2016 -0700
Committer: Asif Shahid <as...@snappydata.io>
Committed: Thu Apr 14 22:12:25 2016 -0700

----------------------------------------------------------------------
 .../gemfire/cache/query/UDAExistsException.java |  43 ++
 .../query/internal/CompiledUDAFunction.java     |  67 +++
 .../aggregate/uda/UDADistributionAdvisor.java   | 187 +++++++++
 .../internal/aggregate/uda/UDAManager.java      |  35 ++
 .../internal/aggregate/uda/UDAManagerImpl.java  | 229 +++++++++++
 .../internal/aggregate/uda/UDAMessage.java      | 106 +++++
 .../cache/xmlcache/UDAManagerCreation.java      |  44 ++
 .../cache/query/dunit/UDACreationDUnitTest.java | 403 +++++++++++++++++++
 .../gemfire/cache/query/dunit/UDADUnitImpl.java | 122 ++++++
 .../dunit/UDAPartitionedQueryDUnitTest.java     | 102 +++++
 .../functional/UDAPartitionedJUnitTest.java     |  40 ++
 .../functional/UDAReplicatedJUnitTest.java      |  34 ++
 .../cache/query/functional/UDATestImpl.java     | 140 +++++++
 .../query/functional/UDATestInterface.java      |  22 +
 14 files changed, 1574 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
new file mode 100644
index 0000000..e40a80a
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/UDAExistsException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.query;
+
+/**
+ * This Exception is thrown if a UDA with a given name already exists in the system,
+ * and a new UDA is being registered with the same name
+ * 
+ * @author ashahid
+ *
+ */
+public class UDAExistsException extends QueryException{
+
+  private static final long serialVersionUID = -1643528839352144L;  
+  
+  public UDAExistsException(String msg) {
+    super(msg);
+  }
+  
+  /**
+   * Constructs instance of UDAExistsException with error message and cause
+   * @param msg the error message
+   * @param cause a Throwable that is a cause of this exception
+   */
+  public UDAExistsException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
new file mode 100644
index 0000000..e00b909
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledUDAFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.query.internal;
+
+import com.gemstone.gemfire.cache.CacheRuntimeException;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Represents the UserDefinedAggregate function node
+ * @author ashahid
+ * @since 9.0
+ *
+ */
+public class CompiledUDAFunction extends CompiledAggregateFunction {
+  private final String udaName;
+
+  public CompiledUDAFunction(CompiledValue expr, int aggFunc, String name) {
+    super(expr, aggFunc);
+    this.udaName = name;
+  }
+
+  @Override
+  public Aggregator evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException,
+                                                      QueryInvocationTargetException {
+    Class<Aggregator> aggregatorClass = ((GemFireCacheImpl) context.getCache()).getUDAManager().getUDAClass(this.udaName);
+    try {
+      return aggregatorClass.newInstance();
+    } catch (Exception e) {
+      throw new CacheRuntimeException(e) {
+      };
+    }
+
+  }
+
+  @Override
+  public ObjectType getObjectType() {
+    return new ObjectTypeImpl(Object.class);
+  }
+
+  @Override
+  protected String getStringRep() {
+    return this.udaName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
new file mode 100644
index 0000000..2d65cb7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDADistributionAdvisor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Handles the exchange of UDAProfile with the peers. 
+ * It is used for exchanging information about the UDAs existing in the system.
+ * @author ashahid
+ * @since 9.0
+ */
+public class UDADistributionAdvisor extends DistributionAdvisor {
+
+  private UDADistributionAdvisor(DistributionAdvisee sender) {
+    super(sender);
+
+  }
+
+  public static UDADistributionAdvisor createUDAAdvisor(DistributionAdvisee sender) {
+    UDADistributionAdvisor advisor = new UDADistributionAdvisor(sender);
+    advisor.initialize();
+    return advisor;
+  }
+
+  /** Instantiate new Sender profile for this member */
+  @Override
+  protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
+    return new UDAProfile(memberId, version);
+  }
+
+  @Override
+  public Set<InternalDistributedMember> adviseProfileExchange() {
+    InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();
+    GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+    DM dm = ids.getDistributionManager();
+    InternalDistributedMember elder = dm.getElderId();
+    Set<InternalDistributedMember> locators = dm.getAllHostedLocators().keySet();
+    if (elder == null || elder.equals(dm.getId()) || locators.contains(elder)) {
+      elder = null;
+      Set<InternalDistributedMember> allMembers = gfc.getDistributionAdvisor().adviseGeneric();
+      Iterator<InternalDistributedMember> iter = allMembers.iterator();
+      while(iter.hasNext()) {
+        InternalDistributedMember temp = iter.next();
+        if(!locators.contains(temp)) {
+          elder = temp;
+          break;
+        }
+      }
+    }
+    if (elder != null) {
+      return Collections.singleton(elder);
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public Set<InternalDistributedMember> adviseProfileUpdate() {
+    InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();   
+    DM dm = ids.getDistributionManager();
+    
+    Set<InternalDistributedMember> locators = dm.getAllHostedLocators().keySet();
+    GemFireCacheImpl gfc = GemFireCacheImpl.getExisting(); 
+    Set<InternalDistributedMember> all = gfc.getDistributionAdvisor().adviseGeneric();
+    all.removeAll(locators);
+    return all;
+  }
+
+  /**
+   * Create or update a profile for a remote counterpart.
+   * 
+   * @param profile
+   *          the profile, referenced by this advisor after this method returns.
+   */
+  @Override
+  public boolean putProfile(Profile profile) {
+    UDAProfile udap = (UDAProfile) profile;
+    GemFireCacheImpl.getExisting().getUDAManager().addUDAs(udap.udas, udap.getCheckforLocalOnlyUDAs());
+    return true;
+  }
+
+  @Override
+  public boolean removeProfile(Profile profile, boolean destroyed) {
+    return true;
+  }
+  
+  @Override
+  public boolean initializationGate() {    
+    return false;
+  }
+
+  /**
+   * Profile information for a remote counterpart.
+   */
+  public static final class UDAProfile extends DistributionAdvisor.Profile {
+
+    private HashMap<String, String> udas = new HashMap<String, String>();
+    private boolean checkForLocalOnlyUDAs = false;
+    public UDAProfile(InternalDistributedMember memberId, int version) {
+      super(memberId, version);
+    }
+
+    public UDAProfile() {
+    }
+    
+    void setCheckForLocalOnlyUDAsAsTrue() {
+      this.checkForLocalOnlyUDAs = true;
+    }
+    
+    boolean getCheckforLocalOnlyUDAs() {
+      return this.checkForLocalOnlyUDAs;
+    }
+    
+    void putInProfile(String udaName, String udaClass) {
+      this.udas.put(udaName, udaClass);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      this.udas = DataSerializer.readHashMap(in);
+      this.checkForLocalOnlyUDAs = DataSerializer.readPrimitiveBoolean(in);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeHashMap(this.udas, out);
+      DataSerializer.writePrimitiveBoolean(this.checkForLocalOnlyUDAs, out);
+    }
+
+    @Override
+    public Version[] getSerializationVersions() {
+      return null;
+    }
+
+    @Override
+    public int getDSFID() {
+      return UDA_PROFILE;
+    }
+
+    @Override
+    public void processIncoming(DistributionManager dm, String adviseePath, boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) {
+      handleDistributionAdvisee(GemFireCacheImpl.getExisting().getUDAManager(), removeProfile, exchangeProfiles, replyProfiles);
+    }
+
+    @Override
+    public void collectProfile(DistributionManager dm, String adviseePath, final List<Profile> replyProfiles) {
+      UDAProfile udap = (UDAProfile)GemFireCacheImpl.getExisting().getUDAManager().getProfile();
+      udap.setCheckForLocalOnlyUDAsAsTrue();
+      replyProfiles.add(udap);
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
new file mode 100644
index 0000000..f6a462e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+
+/**
+ * Interface for creating & removing UDAs.
+ * 
+ * @see UDAManagerImpl
+ * @author ashahid
+ * @since 9.0
+ */
+public interface UDAManager {
+  public Map<String, String> getUDAs() ;
+  public void createUDA(String name, String fqClass) throws UDAExistsException, NameResolutionException;
+  public void removeUDA(String name) ;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
new file mode 100644
index 0000000..964922e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAManagerImpl.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDADistributionAdvisor.UDAProfile;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class UDAManagerImpl implements DistributionAdvisee, UDAManager {
+
+  private ConcurrentMap<String, Class<Aggregator>> map;
+
+  private final DistributionAdvisor advisor;
+  private static final Logger logger = LogService.getLogger();
+  private static ThreadLocal<Map<String, Class<Aggregator>>> localOnlyUDAS = new ThreadLocal<Map<String, Class<Aggregator>>>();
+
+  public UDAManagerImpl() {
+    this.map = new ConcurrentHashMap<String, Class<Aggregator>>();
+    this.advisor = UDADistributionAdvisor.createUDAAdvisor(this);
+
+  }
+
+  public void createUDA(String name, String fqClass) throws UDAExistsException, NameResolutionException {
+    createUDALocally(name, fqClass);
+    new UDAMessage(name, fqClass).send();
+  }
+
+  public Map<String, String> getUDAs() {
+    Map<String, String> mapping = new HashMap<String, String>();
+    for (Map.Entry<String, Class<Aggregator>> entry : this.map.entrySet()) {
+      mapping.put(entry.getKey(), entry.getValue().getName());
+    }
+    return mapping;
+  }
+
+  public void createUDALocally(String name, String fqClass) throws NameResolutionException, UDAExistsException {
+    synchronized (this) {
+      if (!this.map.containsKey(name)) {
+        try {
+          Class<Aggregator> aggregatorClass = (Class<Aggregator>) Class.forName(fqClass);
+          this.map.put(name, aggregatorClass);
+        } catch (ClassNotFoundException cnfe) {
+          throw new NameResolutionException(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(fqClass, name), cnfe);
+        }
+      } else {
+        throw new UDAExistsException(LocalizedStrings.UDA_MANAGER_Uda_Exists.toLocalizedString(name));
+      }
+    }
+  }
+
+  public void collectUDAsFromRemote() {
+    int numTries = 5;
+    boolean collectedUDAs = false;
+    for (int i = 0; i < numTries; ++i) {
+      try {
+        new UpdateAttributesProcessor(this).collect(DataSerializableFixedID.UDA_PROFILE);
+        collectedUDAs = true;
+        break;
+      } catch (Exception e) {
+        this.getDistributionManager().getCancelCriterion().checkCancelInProgress(e);
+      }
+    }
+    if (!collectedUDAs) {
+      if (logger.isErrorEnabled()) {
+        logger.error(LocalizedStrings.UDA_MANAGER_Udas_Not_Collected.toLocalizedString());
+      }
+    }
+  }
+
+  void addUDAs(Map<String, String> udas, boolean checkforOnlyLocal) {
+    // Get those UDAs which are present only locally
+    final Map<String, Class<Aggregator>> onlyLocalUDAs = new HashMap<String, Class<Aggregator>>();
+    if (logger.isInfoEnabled()) {
+      logger.info("UDAManagerImpl::addUDAs: adding remote collected UDAs=" + udas + " check for local only =" + checkforOnlyLocal);
+    }
+    synchronized (this) {
+      if (checkforOnlyLocal) {
+        for (Map.Entry<String, Class<Aggregator>> entry : this.map.entrySet()) {
+          if (!udas.containsKey(entry.getKey())) {
+            onlyLocalUDAs.put(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+      for (Map.Entry<String, String> entry : udas.entrySet()) {
+        String udaName = entry.getKey();
+        String udaClass = entry.getValue();
+        Class<Aggregator> existingUDAClass = this.map.get(udaName);
+        if (existingUDAClass == null) {
+          try {
+            Class<Aggregator> aggregatorClass = (Class<Aggregator>) Class.forName(udaClass);
+            this.map.put(udaName, aggregatorClass);
+          } catch (ClassNotFoundException cnfe) {
+            logger.error(LocalizedStrings.UDA_MANAGER_Class_Not_Found.toLocalizedString(udaClass, udaName));
+          }
+        } else {
+          // check if the classes are same
+          if (!udaClass.equals(existingUDAClass.getName())) {
+            logger.error(LocalizedStrings.UDA_MANAGER_Class_Conflict.toLocalizedString(udaName, existingUDAClass.getName(), udaClass));
+          }
+        }
+      }
+    }
+
+    if (checkforOnlyLocal && !onlyLocalUDAs.isEmpty()) {
+      if (logger.isInfoEnabled()) {
+        logger.info("UDAManagerImpl::addUDAs:Only local UDAs=" + onlyLocalUDAs);
+      }
+      localOnlyUDAS.set(onlyLocalUDAs);
+      new UpdateAttributesProcessor(UDAManagerImpl.this).sendProfileUpdate(false);
+    }
+
+  }
+
+  public Class<Aggregator> getUDAClass(String name) throws NameResolutionException {
+    Class<Aggregator> aggClass = this.map.get(name);
+    if (aggClass == null) {
+      throw new NameResolutionException(LocalizedStrings.UDA_MANAGER_Aggregator_Not_Found.toLocalizedString(name));
+    }
+    return aggClass;
+  }
+
+  public synchronized void clear() {
+    this.map.clear();
+  }
+
+  @Override
+  public void removeUDA(String udaName) {
+    this.removeUDALocally(udaName);
+    new UDAMessage(udaName).send();
+  }
+
+  void removeUDALocally(String udaName) {
+    synchronized (this) {
+      this.map.remove(udaName);
+    }
+  }
+
+  @Override
+  public DM getDistributionManager() {
+    return getSystem().getDistributionManager();
+  }
+
+  @Override
+  public CancelCriterion getCancelCriterion() {
+    return null;
+  }
+
+  @Override
+  public DistributionAdvisor getDistributionAdvisor() {
+
+    return this.advisor;
+  }
+
+  @Override
+  public Profile getProfile() {
+    return this.advisor.createProfile();
+  }
+
+  @Override
+  public DistributionAdvisee getParentAdvisee() {
+    return null;
+  }
+
+  @Override
+  public InternalDistributedSystem getSystem() {
+    return InternalDistributedSystem.getConnectedInstance();
+  }
+
+  @Override
+  public String getName() {
+    return null;
+  }
+
+  @Override
+  public String getFullPath() {
+    return null;
+  }
+
+  @Override
+  public void fillInProfile(Profile profile) {
+    UDAProfile udap = (UDAProfile) profile;
+    synchronized (this) {
+      Map<String, Class<Aggregator>> localOnly = localOnlyUDAS.get();
+      localOnlyUDAS.set(null);
+      Map<String, Class<Aggregator>> toSend = localOnly != null ? localOnly : this.map;
+      for (Map.Entry<String, Class<Aggregator>> entry : toSend.entrySet()) {
+        udap.putInProfile(entry.getKey(), entry.getValue().getName());
+      }
+    }
+
+  }
+
+  @Override
+  public int getSerialNumber() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
new file mode 100644
index 0000000..706499d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/uda/UDAMessage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.internal.aggregate.uda;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Used for sending UDA creation / removal message
+ * @author ashahid
+ * @since 9.0 
+ */
+public class UDAMessage extends DistributionMessage {
+  private boolean isCreate;
+  private String udaName;
+  private String udaClass;
+
+  public UDAMessage() {}
+
+  public UDAMessage(String name, String udaClass) {
+    this.isCreate = true;
+    this.udaName = name;
+    this.udaClass = udaClass;
+  }
+
+  public UDAMessage(String name) {
+    this.isCreate = false;
+    this.udaName = name;
+  }
+
+  @Override
+  public int getDSFID() {
+    return UDA_MESSAGE;
+  }
+
+  @Override
+  public int getProcessorType() {
+    return DistributionManager.SERIAL_EXECUTOR;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    if (this.isCreate) {
+      try {
+        GemFireCacheImpl.getExisting().getUDAManager().createUDALocally(this.udaName, this.udaClass);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    } else {
+      GemFireCacheImpl.getExisting().getUDAManager().removeUDALocally(this.udaName);
+    }
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.isCreate = DataSerializer.readPrimitiveBoolean(in);
+    this.udaName = DataSerializer.readString(in);
+    if (this.isCreate) {
+      this.udaClass = DataSerializer.readString(in);
+    }
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writePrimitiveBoolean(this.isCreate, out);
+    DataSerializer.writeString(this.udaName, out);
+    if (this.isCreate) {
+      DataSerializer.writeString(this.udaClass, out);
+    }
+  }
+
+  public void send() {
+    GemFireCacheImpl gfc = GemFireCacheImpl.getExisting();
+    DistributionAdvisor advisor = gfc.getDistributionAdvisor();
+    final Set<InternalDistributedMember> recipients = new HashSet<InternalDistributedMember>(advisor.adviseGeneric());
+    recipients.remove(gfc.getDistributionManager().getId());
+    this.setRecipients(recipients);
+    gfc.getDistributionManager().putOutgoing(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
new file mode 100644
index 0000000..60d862c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/UDAManagerCreation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.xmlcache;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+
+public class UDAManagerCreation implements UDAManager {
+  private Map<String, String> map = new HashMap<String, String>();
+  
+  @Override
+  public Map<String, String> getUDAs() {
+    return Collections.unmodifiableMap(this.map);
+  }
+  
+  @Override
+  public void createUDA(String name, String fqClass) {
+    this.map.put(name, fqClass);  
+  }
+  
+  @Override
+  public void removeUDA(String name) {
+    this.map.remove(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
new file mode 100644
index 0000000..2abcce2
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDACreationDUnitTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl.SumUDA;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass1;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass2;
+import com.gemstone.gemfire.cache30.CacheXmlGeode10DUnitTest.UDACLass3;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
+import com.gemstone.gemfire.internal.cache.xmlcache.ClientCacheCreation;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+@Category(DistributedTest.class)
+public class UDACreationDUnitTest extends CacheTestCase {
+  public UDACreationDUnitTest(String name) {
+    super(name);
+  }
+
+  @Test
+  public void testUDACreationThroughProfileExchange() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = getCache();
+    final String udaName = "uda1";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+    createCache(vm1, vm2, vm3);
+    validateUDAExists(udaName, vm1, vm2, vm3);
+    this.closeCache(vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testUDACreationThroughMessage() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = getCache();
+    createCache(vm1, vm2, vm3);
+    final String udaName = "uda1";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+    validateUDAExists(udaName, vm1, vm2, vm3);
+    this.closeCache(vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testUDARemovalThroughMessage() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = getCache();
+    createCache(vm1, vm2, vm3);
+    final String udaName = "uda1";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA(udaName, "com.gemstone.gemfire.cache.query.dunit.UDACreationDUnitTest$SumUDA");
+    validateUDAExists(udaName, vm1, vm2, vm3);
+    qs.removeUDA(udaName);
+    validateUDADoesNotExists(udaName, vm1, vm2, vm3);
+    this.closeCache(vm1, vm2, vm3);
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testUDAProfileMerge() {
+    Host host = Host.getHost(0);
+    final VM vm1 = host.getVM(1);
+
+    final CacheCreation cacheCreation1 = new CacheCreation();
+    cacheCreation1.addUDA("uda1", UDACLass1.class.getName());
+    Helper helper = new Helper();
+    helper.createCacheThruXML(cacheCreation1);
+    final Cache c = getCache();
+    assertNotNull(c);
+    UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+    Map<String, String> map = udaMgr.getUDAs();
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+
+    // Now in VM1 create another cache through XMl containing uda2
+    vm1.invoke(new SerializableRunnable("Create Cache in other VM") {
+      public void run() {
+
+        final CacheCreation cacheCreation2 = new CacheCreation();
+        cacheCreation2.addUDA("uda2", UDACLass2.class.getName());
+        cacheCreation2.addUDA("uda3", UDACLass3.class.getName());
+        Helper helper = new Helper();
+        helper.createCacheThruXML(cacheCreation2);
+        final Cache c = getCache();
+
+      }
+    });
+    // This VM should also have 3 UDAs at the end of intialization of remote vm
+
+    map = udaMgr.getUDAs();
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+    assertEquals(map.get("uda2"), UDACLass2.class.getName());
+    assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+    vm1.invoke(new SerializableRunnable("Create Cache in other VM") {
+      public void run() {
+        // validate at the end of intialization, there exists 3 UDAs
+        UDAManager udaMgr = ((GemFireCacheImpl) getCache()).getUDAManager();
+        Map<String, String> map = udaMgr.getUDAs();
+        assertEquals(map.get("uda1"), UDACLass1.class.getName());
+        assertEquals(map.get("uda2"), UDACLass2.class.getName());
+        assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+      }
+    });
+
+  }
+  
+  
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testUDAProfileMergeMultipleVMs() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    AsyncInvocation one = this.createCacheWithUDAAsynchronously("uda1", UDACLass1.class.getName(), vm1);
+    AsyncInvocation two = this.createCacheWithUDAAsynchronously("uda2", UDACLass2.class.getName(), vm2);
+    AsyncInvocation three = this.createCacheWithUDAAsynchronously("uda3", UDACLass3.class.getName(), vm3);
+   
+    final Cache c = getCache();
+    assertNotNull(c);
+    one.join();
+    two.join();
+    three.join();
+    UDAManager udaMgr = ((GemFireCacheImpl) c).getUDAManager();
+    Map<String, String> map = udaMgr.getUDAs();   
+    assertEquals(map.get("uda1"), UDACLass1.class.getName());
+    assertEquals(map.get("uda2"), UDACLass2.class.getName());
+    assertEquals(map.get("uda3"), UDACLass3.class.getName());
+
+    validateUDAExists("uda1", vm1, vm2, vm3);
+    validateUDAExists("uda2", vm1, vm2, vm3);
+    validateUDAExists("uda3", vm1, vm2, vm3);
+
+  }
+
+  @Override
+  public final void postSetUp() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  private void createCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable("create cache") {
+        public void run() {
+          Cache cache = getCache();
+        }
+      });
+    }
+  }
+
+  private void validateUDAExists(final String udaName, VM... vms) {
+    try {
+      Class<Aggregator> udaClass = ((GemFireCacheImpl) this.getCache()).getUDAManager().getUDAClass(udaName);
+      assertNotNull(udaClass);
+    } catch (NameResolutionException nre) {
+      throw new RuntimeException(nre);
+    }
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable("validate UDA exists") {
+        public void run() {
+          try {
+            Class<Aggregator> udaClass = ((GemFireCacheImpl) getCache()).getUDAManager().getUDAClass(udaName);
+            assertNotNull(udaClass);
+          } catch (NameResolutionException nre) {
+            throw new RuntimeException(nre);
+          }
+
+        }
+      });
+    }
+  }
+  
+  private AsyncInvocation createCacheWithUDAAsynchronously(final String udaName, 
+      final String udaClass, VM vm) {
+    // Now in VM1 create another cache through XMl containing uda2
+    return vm.invokeAsync(new SerializableRunnable("Create Cache in other VM") {
+      public void run() {
+        final CacheCreation cacheCreation2 = new CacheCreation();
+        cacheCreation2.addUDA(udaName, udaClass);       
+        Helper helper = new Helper();
+        helper.createCacheThruXML(cacheCreation2);
+        final Cache c = getCache();
+
+      }
+    });
+  }
+
+  private void validateUDADoesNotExists(final String udaName, VM... vms) {
+    try {
+      Class<Aggregator> udaClass = ((GemFireCacheImpl) this.getCache()).getUDAManager().getUDAClass(udaName);
+      fail("UDA should not exist");
+    } catch (NameResolutionException nre) {
+      // OK
+    }
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable("validate UDA exists") {
+        public void run() {
+          try {
+            Class<Aggregator> udaClass = ((GemFireCacheImpl) getCache()).getUDAManager().getUDAClass(udaName);
+            fail("UDA should not exist");
+          } catch (NameResolutionException nre) {
+            // OK
+          }
+
+        }
+      });
+    }
+  }
+
+  private void closeCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable() {
+        public void run() {
+          getCache().close();
+        }
+      });
+    }
+  }
+
+  protected String getGemFireVersion() {
+    return CacheXml.VERSION_1_0;
+  }
+
+  public static class SumUDA implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+
+    }
+
+  }
+
+  public class UDACLass1 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+    }
+
+  }
+
+  public class UDACLass2 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+    }
+
+  }
+
+  public class UDACLass3 implements Aggregator {
+
+    @Override
+    public void accumulate(Object value) {
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public Object terminate() {
+      return null;
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+    }
+
+  }
+
+  public static class Helper extends CacheXmlTestCase {
+    public Helper() {
+      super("UDACreationDUnitTest:testUDAProfileMerge");
+      CacheXmlTestCase.lonerDistributedSystem = false;
+    }
+
+    public void createCacheThruXML(CacheCreation creation) {
+      this.testXml(creation, true);
+    }
+
+    @Override
+    protected String getGemFireVersion() {
+      return CacheXml.VERSION_1_0;
+    }
+
+    @Override
+    protected boolean getUseSchema() {
+      return true;
+    }
+    
+    @Test
+    public void testDummy(){}
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
new file mode 100644
index 0000000..7f80657
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDADUnitImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.query.Index;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.IndexType;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestInterface;
+import com.gemstone.gemfire.cache.query.functional.NonDistinctOrderByTestImplementation;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * 
+ * @author ashahid
+ *
+ */
+public abstract class UDADUnitImpl extends CacheTestCase implements UDATestInterface{
+
+
+  public UDADUnitImpl(String name) {
+    super(name);
+  }
+
+  protected abstract UDATestInterface createTestInstance();
+
+  @Test
+  @Override
+  public void testUDANoGroupBy() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    UDATestInterface test = createTestInstance();
+    test.testUDANoGroupBy();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  @Override
+  public void testUDAWithGroupBy() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    UDATestInterface test = createTestInstance();
+    test.testUDAWithGroupBy();;
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+ 
+  
+  protected void createIndex(VM vm, final String indexName,
+      final String indexedExpression, final String regionPath) {
+    vm.invoke(new SerializableRunnable("create index") {
+      public void run() {
+        try {
+          Cache cache = getCache();
+          cache.getQueryService().createIndex(indexName, indexedExpression,
+              regionPath);
+        } catch (RegionNotFoundException e) {
+          fail(e.toString());
+        } catch (IndexExistsException e) {
+          fail(e.toString());
+        } catch (IndexNameConflictException e) {
+          fail(e.toString());
+        }
+      }
+    });
+  }
+
+  @Override
+  public final void postSetUp() throws Exception {
+    disconnectAllFromDS();
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    disconnectAllFromDS();
+  }
+  
+
+  private void closeCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable() {
+        public void run() {
+          getCache().close();
+        }
+      });
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
new file mode 100644
index 0000000..ba0ce9a
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/UDAPartitionedQueryDUnitTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.dunit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.query.functional.GroupByTestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestImpl;
+import com.gemstone.gemfire.cache.query.functional.UDATestInterface;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+@Category(DistributedTest.class)
+public class UDAPartitionedQueryDUnitTest extends UDADUnitImpl {
+
+
+  public UDAPartitionedQueryDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  protected UDATestInterface createTestInstance() {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+
+    UDATestImpl test = new UDATestImpl() {
+
+      @Override
+      public Region createRegion(String regionName, Class valueConstraint) {
+        // TODO Auto-generated method stub
+        Region rgn = createAccessor(regionName, valueConstraint);
+        createPR(vm1, regionName, valueConstraint);
+        createPR(vm2, regionName, valueConstraint);
+        createPR(vm3, regionName, valueConstraint);
+        return rgn;
+      }
+    };
+    return test;
+  }
+
+  private void createBuckets(VM vm) {
+    vm.invoke(new SerializableRunnable("create accessor") {
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region");
+        for (int i = 0; i < 10; i++) {
+          region.put(i, i);
+        }
+      }
+    });
+  }
+
+  private void createPR(VM vm, final String regionName,
+      final Class valueConstraint) {
+    vm.invoke(new SerializableRunnable("create data store") {
+      public void run() {
+        Cache cache = getCache();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setTotalNumBuckets(10);
+        cache.createRegionFactory(RegionShortcut.PARTITION)
+            .setValueConstraint(valueConstraint)
+            .setPartitionAttributes(paf.create()).create(regionName);
+      }
+    });
+  }
+
+  private Region createAccessor(String regionName, Class valueConstraint) {
+
+    Cache cache = getCache();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(10);
+    paf.setLocalMaxMemory(0);
+    return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+        .setValueConstraint(valueConstraint)
+        .setPartitionAttributes(paf.create()).create(regionName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
new file mode 100644
index 0000000..1f5b3e7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAPartitionedJUnitTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class UDAPartitionedJUnitTest extends UDATestImpl {
+
+  @Override
+  public Region createRegion(String regionName, Class valueConstraint) {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    AttributesFactory af = new AttributesFactory();
+    af.setPartitionAttributes(paf.create());
+    af.setValueConstraint(valueConstraint);
+    Region r1 = CacheUtils.createRegion(regionName, af.create(), false);
+    return r1;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
new file mode 100644
index 0000000..9db76ab
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDAReplicatedJUnitTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class UDAReplicatedJUnitTest extends UDATestImpl {
+
+  @Override
+  public Region createRegion(String regionName, Class valueConstraint) {
+    Region r1 = CacheUtils.createRegion(regionName, valueConstraint);
+    return r1;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
new file mode 100644
index 0000000..7b59243
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.Aggregator;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+
+/**
+ * Tests the group by queries with or without aggreagte functions
+ * 
+ * @author Asif
+ *
+ *
+ */
+public abstract class UDATestImpl implements UDATestInterface {
+
+ 
+  public abstract Region createRegion(String regionName, Class constraint);
+
+  @Test
+  public void testUDANoGroupBy() throws Exception {
+    Region region = this.createRegion("portfolio", Portfolio.class);
+    int sum = 0;
+    for (int i = 1; i < 200; ++i) {
+      Portfolio pf = new Portfolio(i);
+      pf.shortID = (short) ((short) i / 5);
+      region.put("" + i, pf);
+      sum += pf.ID;
+    }
+    String queryStr = "select myUDA(p.ID) from /portfolio p where p.ID > 0  ";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA("myUDA", "com.gemstone.gemfire.cache.query.functional.UDATestImpl$SumUDA");
+    Query query = qs.newQuery(queryStr);
+    SelectResults sr = (SelectResults) query.execute();
+    assertEquals(sum, ((Integer)sr.asList().get(0)).intValue());
+  }
+
+  @Test
+  public void testUDAWithGroupBy() throws Exception {
+    Region region = this.createRegion("portfolio", Portfolio.class);
+    int sumActive = 0;
+    int sumInactive = 0;
+    for (int i = 1; i < 200; ++i) {
+      Portfolio pf = new Portfolio(i);
+      pf.shortID = (short) ((short) i / 5);
+      region.put("" + i, pf);
+      if(pf.status.equals("active")) {
+        sumActive += pf.ID;
+      }else {
+        sumInactive += pf.ID;
+      }
+      
+    }
+    String queryStr = "select p.status , myUDA(p.ID) from /portfolio p where p.ID > 0 group by p.status order by p.status";
+    QueryService qs = CacheUtils.getQueryService();
+    qs.createUDA("myUDA", "com.gemstone.gemfire.cache.query.functional.UDATestImpl$SumUDA");
+    Query query = qs.newQuery(queryStr);
+    SelectResults sr = (SelectResults) query.execute();
+    List<Struct> structs = (List<Struct>)sr.asList();
+    assertEquals(2, structs.size());
+    assertTrue(structs.get(0).getFieldValues()[0].equals("active"));
+    assertEquals(sumActive, ((Integer)structs.get(0).getFieldValues()[1]).intValue());
+    
+    assertTrue(structs.get(1).getFieldValues()[0].equals("inactive"));
+    assertEquals(sumInactive, ((Integer)structs.get(1).getFieldValues()[1]).intValue());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    CacheUtils.startCache();
+    CacheUtils.getCache();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    CacheUtils.closeCache();
+  }
+  
+  public static class SumUDA implements Aggregator, Serializable {
+
+    private int sum =0;
+    
+    public SumUDA() {
+      
+    }
+    @Override
+    public void accumulate(Object value) {
+      sum += ((Integer)value).intValue();
+      
+    }
+
+    @Override
+    public void init() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public Object terminate() {      
+      return Integer.valueOf(sum);
+    }
+
+    @Override
+    public void merge(Aggregator otherAggregator) {
+      SumUDA uda = (SumUDA)otherAggregator;
+      this.sum += uda.sum;      
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/42fb6fc0/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
new file mode 100644
index 0000000..f7c9e40
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/functional/UDATestInterface.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.query.functional;
+
+public interface UDATestInterface {
+  public void testUDANoGroupBy() throws Exception ;  
+  public void  testUDAWithGroupBy() throws Exception ;
+}


[3/6] incubator-geode git commit: Implementing the UDA functionality in the OQL engine

Posted by up...@apache.org.
Implementing the UDA functionality in the OQL engine


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4f85cac9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4f85cac9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4f85cac9

Branch: refs/heads/feature/GEODE-1269
Commit: 4f85cac959a20e81ec842307b7dd27faeb5041d8
Parents: f702bcf
Author: Asif Shahid <as...@snappydata.io>
Authored: Thu Apr 14 22:01:41 2016 -0700
Committer: Asif Shahid <as...@snappydata.io>
Committed: Thu Apr 14 22:01:41 2016 -0700

----------------------------------------------------------------------
 .../gemfire/cache/query/Aggregator.java         |  20 +-
 .../gemfire/cache/query/QueryService.java       |  22 ++
 .../internal/CompiledAggregateFunction.java     |  60 +--
 .../query/internal/CompiledGroupBySelect.java   |  39 +-
 .../query/internal/DefaultQueryService.java     |  14 +
 .../cache/query/internal/ProxyQueryService.java |  12 +
 .../gemfire/cache/query/internal/QCompiler.java |   4 +
 .../internal/aggregate/AbstractAggregator.java  |  23 +-
 .../cache/query/internal/aggregate/Avg.java     |  34 +-
 .../query/internal/aggregate/AvgDistinct.java   |   4 +-
 .../cache/query/internal/aggregate/Count.java   |  41 +-
 .../query/internal/aggregate/CountDistinct.java |   4 +-
 .../internal/aggregate/DistinctAggregator.java  |  42 +-
 .../cache/query/internal/aggregate/MaxMin.java  |  39 +-
 .../cache/query/internal/aggregate/Sum.java     |  40 +-
 .../query/internal/aggregate/SumDistinct.java   |   6 +-
 .../query/internal/parse/ASTAggregateFunc.java  |  12 +-
 .../cache/query/internal/parse/OQLLexer.java    | 164 ++++----
 .../internal/parse/OQLLexerTokenTypes.java      | 169 ++++----
 .../query/internal/parse/OQLLexerTokenTypes.txt | 169 ++++----
 .../cache/query/internal/parse/OQLParser.java   | 387 ++++++++++---------
 .../gemfire/cache/query/internal/parse/oql.g    |  13 +-
 .../internal/DistributionAdvisor.java           |  13 +
 .../gemstone/gemfire/internal/DSFIDFactory.java |  83 ++--
 .../internal/DataSerializableFixedID.java       |  10 +-
 .../internal/cache/GemFireCacheImpl.java        |  12 +-
 .../gemfire/internal/cache/InternalCache.java   |   3 +
 .../cache/PartitionedRegionQueryEvaluator.java  |   8 +-
 .../cache/UpdateAttributesProcessor.java        | 146 +++++++
 .../internal/cache/xmlcache/CacheCreation.java  |  41 +-
 .../internal/cache/xmlcache/CacheXml.java       |   5 +
 .../cache/xmlcache/CacheXmlGenerator.java       |  26 +-
 .../internal/cache/xmlcache/CacheXmlParser.java |  29 ++
 .../internal/i18n/ParentLocalizedStrings.java   |  10 +-
 .../geode.apache.org/schema/cache/cache-1.0.xsd |  12 +
 .../dunit/GroupByPartitionedQueryDUnitTest.java |   6 -
 .../CompiledAggregateFunctionJUnitTest.java     |  46 +--
 .../internal/aggregate/AggregatorJUnitTest.java | 193 +++++----
 .../cache30/CacheXmlGeode10DUnitTest.java       |  74 ++++
 .../gemfire/cache30/CacheXmlTestCase.java       |   2 +-
 .../website/content/schema/cache/cache-1.0.xsd  |  13 +
 41 files changed, 1361 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/Aggregator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/Aggregator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/Aggregator.java
index 6991e53..b43f9ff 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/Aggregator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/Aggregator.java
@@ -21,7 +21,17 @@ package com.gemstone.gemfire.cache.query;
  * result. In addition to the methods in the interface, implementing classes
  * must have a 0-arg public constructor.
  * 
- *
+ * For replicated Regions, it is necessary to implement required functionality in {@link #accumulate(Object)} and {@link #terminate()}
+ * 
+ * For PartitionedRegions, the aggregator Objects are themselves serialized from the
+ * bucket nodes to the query node. On the query node, the aggregators are merged
+ * in {@link #merge(Aggregator)}
+ * 
+ * For PartitionedRegions, the aggregator class needs to be serializable
+ * 
+ * 
+ * @author ashahid
+ * @since 9.0
  */
 public interface Aggregator {
 
@@ -42,4 +52,12 @@ public interface Aggregator {
    * @return Return the result scalar value
    */
   public Object terminate();
+
+  /**
+   * Merges the incoming aggregator from bucket nodes with the resultant aggregator
+   * on the query node
+   * 
+   * @param otherAggregator
+   */
+  public void merge(Aggregator otherAggregator);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/QueryService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/QueryService.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/QueryService.java
index 3a98a43..3be7e80 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/QueryService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/QueryService.java
@@ -847,5 +847,27 @@ public interface QueryService {
    * 
    */
   public CqServiceStatistics getCqStatistics();
+  
+  /**
+   * Creates a UserDefinedAggregate ( UDA ) which can be referenced in querying.
+   * The UDA class must implement {@link Aggregator} interface.
+   * This call defines the UDA through out the system , getting executed on all peers     
+   * @param udaName String alias with which the UDA class is refered to
+   * @param udaClass Fully qualified UDA class name
+   * @throws UDAExistsException If the system has already a UDA defined with the udaName
+   * @throws NameResolutionException If system is unable to load the UDA class using the class name
+   * 
+   * @since 9.0
+   */
+  public void createUDA(String udaName, String udaClass) throws UDAExistsException, NameResolutionException;
+  
+  /**
+   * Removes the UDA defined in system with name
+   * This call removes the UDA mapping from the system ,  getting executed on all peers
+   * @param udaName String alias with which the UDA is known
+   * 
+   * @since 9.0
+   */
+  public void removeUDA(String udaName) ;
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunction.java
index 87f29fa..cc63c8a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunction.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledAggregateFunction.java
@@ -16,23 +16,17 @@
  */
 package com.gemstone.gemfire.cache.query.internal;
 
+import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.AmbiguousNameException;
 import com.gemstone.gemfire.cache.query.FunctionDomainException;
 import com.gemstone.gemfire.cache.query.NameResolutionException;
 import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
 import com.gemstone.gemfire.cache.query.TypeMismatchException;
-import com.gemstone.gemfire.cache.query.internal.aggregate.AvgBucketNode;
+import com.gemstone.gemfire.cache.query.internal.aggregate.Avg;
 import com.gemstone.gemfire.cache.query.internal.aggregate.AvgDistinct;
-import com.gemstone.gemfire.cache.query.internal.aggregate.AvgDistinctPRQueryNode;
-import com.gemstone.gemfire.cache.query.internal.aggregate.AvgPRQueryNode;
 import com.gemstone.gemfire.cache.query.internal.aggregate.Count;
 import com.gemstone.gemfire.cache.query.internal.aggregate.CountDistinct;
-import com.gemstone.gemfire.cache.query.internal.aggregate.CountDistinctPRQueryNode;
-import com.gemstone.gemfire.cache.query.internal.aggregate.SumDistinctPRQueryNode;
-import com.gemstone.gemfire.cache.query.internal.aggregate.CountPRQueryNode;
-import com.gemstone.gemfire.cache.query.internal.aggregate.DistinctAggregator;
 import com.gemstone.gemfire.cache.query.internal.aggregate.MaxMin;
-import com.gemstone.gemfire.cache.query.internal.aggregate.Avg;
 import com.gemstone.gemfire.cache.query.internal.aggregate.Sum;
 import com.gemstone.gemfire.cache.query.internal.aggregate.SumDistinct;
 import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
@@ -40,6 +34,7 @@ import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
 import com.gemstone.gemfire.cache.query.types.ObjectType;
 
 /**
+ * Represents the  built-in aggregate function node
  * 
  *
  */
@@ -67,20 +62,12 @@ public class CompiledAggregateFunction extends AbstractCompiledValue {
   }
 
   @Override
-  public Object evaluate(ExecutionContext context)
-      throws FunctionDomainException, TypeMismatchException,
-      NameResolutionException, QueryInvocationTargetException {
-    boolean isPRQueryNode = context.getIsPRQueryNode();
-    boolean isBucketNode = context.getBucketList() != null;
+  public Aggregator evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException,
+                                                      QueryInvocationTargetException {
     switch (this.aggFuncType) {
 
     case OQLLexerTokenTypes.SUM:
-      if (isPRQueryNode) {
-        return this.distinctOnly ? new SumDistinctPRQueryNode() : new Sum();
-      } else {
-        return this.distinctOnly ? (isBucketNode ? new DistinctAggregator()
-            : new SumDistinct()) : new Sum();
-      }
+      return this.distinctOnly ? new SumDistinct() : new Sum();
 
     case OQLLexerTokenTypes.MAX:
       return new MaxMin(true);
@@ -89,33 +76,21 @@ public class CompiledAggregateFunction extends AbstractCompiledValue {
       return new MaxMin(false);
 
     case OQLLexerTokenTypes.AVG:
-      if (isPRQueryNode) {
-        return this.distinctOnly ? new AvgDistinctPRQueryNode()
-            : new AvgPRQueryNode();
-      } else {
-        return this.distinctOnly ? (isBucketNode ? new DistinctAggregator()
-            : new AvgDistinct()) : (isBucketNode ? new AvgBucketNode()
-            : new Avg());
-      }
+
+      return this.distinctOnly ? new AvgDistinct() : new Avg();
 
     case OQLLexerTokenTypes.COUNT:
-      if (isPRQueryNode) {
-        return this.distinctOnly ? new CountDistinctPRQueryNode()
-            : new CountPRQueryNode();
-      } else {
-        return this.distinctOnly ? (isBucketNode ? new DistinctAggregator()
-            : new CountDistinct()) : new Count();
-      }
+
+      return this.distinctOnly ? new CountDistinct() : new Count();
 
     default:
-      throw new UnsupportedOperationException(
-          "Aggregate function not implemented");
+      throw new UnsupportedOperationException("Aggregate function not implemented");
 
     }
 
   }
 
-  private String getStringRep() {
+  protected String getStringRep() {
     switch (this.aggFuncType) {
 
     case OQLLexerTokenTypes.SUM:
@@ -132,8 +107,7 @@ public class CompiledAggregateFunction extends AbstractCompiledValue {
     case OQLLexerTokenTypes.COUNT:
       return "count";
     default:
-      throw new UnsupportedOperationException(
-          "Aggregate function not implemented");
+      throw new UnsupportedOperationException("Aggregate function not implemented");
 
     }
   }
@@ -159,16 +133,14 @@ public class CompiledAggregateFunction extends AbstractCompiledValue {
       return new ObjectTypeImpl(Integer.class);
 
     default:
-      throw new UnsupportedOperationException(
-          "Aggregate function not implemented");
+      throw new UnsupportedOperationException("Aggregate function not implemented");
 
     }
   }
 
   @Override
-  public void generateCanonicalizedExpression(StringBuffer clauseBuffer,
-      ExecutionContext context) throws AmbiguousNameException,
-      TypeMismatchException, NameResolutionException {
+  public void generateCanonicalizedExpression(StringBuffer clauseBuffer, ExecutionContext context) throws AmbiguousNameException, TypeMismatchException,
+                                                                                                  NameResolutionException {
     clauseBuffer.insert(0, ')');
     if (this.expr != null) {
       this.expr.generateCanonicalizedExpression(clauseBuffer, context);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledGroupBySelect.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledGroupBySelect.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledGroupBySelect.java
index 59e228f..daa75bf 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledGroupBySelect.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledGroupBySelect.java
@@ -181,6 +181,7 @@ public class CompiledGroupBySelect extends CompiledSelect {
     ObjectType elementType = baseResults.getCollectionType().getElementType();
     boolean isStruct = elementType != null && elementType.isStructType();
     boolean isBucketNodes = context.getBucketList() != null;
+    boolean isPRQueryNode = context.getIsPRQueryNode();
     boolean createOrderedResultSet = isBucketNodes && this.orderByAttrs != null;
     boolean[] objectChangedMarker = new boolean[]{false};
     int limitValue = evaluateLimitValue(context, limit);
@@ -197,7 +198,7 @@ public class CompiledGroupBySelect extends CompiledSelect {
       boolean unterminated = iter.hasNext();
       while (iter.hasNext()) {
         current = iter.next();
-        accumulate(isStruct, aggregators, current, objectChangedMarker);
+        accumulate(isStruct, aggregators, current, objectChangedMarker, isPRQueryNode);
       }
       if (unterminated) {
         this.terminateAndAddToResults(isStruct, newResults, aggregators,
@@ -286,8 +287,8 @@ public class CompiledGroupBySelect extends CompiledSelect {
     Object[] orderByTupleHolderCurrent = null;
     Object[] orderByTupleHolderPrev = null;
     Object orderByCurrent = null;
-    Object orderByPrev = null;
-   
+    Object orderByPrev = null;  
+    boolean isPRQueryNode = context.getIsPRQueryNode();
     boolean isSingleOrderBy = this.orderByAttrs.size() <= 1;
     if (!isSingleOrderBy) {
       orderByTupleHolderPrev = new Object[orderByAttrs.size()];
@@ -311,13 +312,13 @@ public class CompiledGroupBySelect extends CompiledSelect {
       if (isFirst
           || areOrderByTupleEqual(isSingleOrderBy, orderByPrev, orderByCurrent,
               orderByTupleHolderPrev, orderByTupleHolderCurrent)) {
-        accumulate(isStruct, aggregators, current, objectChangedMarker);
+        accumulate(isStruct, aggregators, current, objectChangedMarker, isPRQueryNode);
         unterminated = true;
         isFirst = false;
       } else {
         keepAdding = terminateAndAddToResults(isStruct, newResults, aggregators, prev,
             context, isStructFields, limitValue);
-        this.accumulate(isStruct, aggregators, current, objectChangedMarker);
+        this.accumulate(isStruct, aggregators, current, objectChangedMarker, isPRQueryNode);
         unterminated = true;
       }
       // swap the holder arrays
@@ -350,15 +351,16 @@ public class CompiledGroupBySelect extends CompiledSelect {
     if(limitValue == 0) {
       return false;
     }
-    
+    boolean isBucketNodes = context.getBucketList() != null;
+   
     for (Aggregator aggregator : aggregators) {
       if (isStruct) {
         int pos = this.aggregateColsPos.nextSetBit(bitstart);
         bitstart = pos + 1;
-        Object scalarResult = aggregator.terminate();
+        Object scalarResult = isBucketNodes? aggregator : aggregator.terminate();
         newRowArray[pos] = scalarResult;
       } else {
-        newObject = aggregator.terminate();
+        newObject = isBucketNodes? aggregator: aggregator.terminate();
       }
     }
     
@@ -402,19 +404,28 @@ public class CompiledGroupBySelect extends CompiledSelect {
   }
 
   private void accumulate(boolean isStruct, Aggregator[] aggregators,
-      Object current, boolean[] objectChangedMarker) {
+      Object current, boolean[] objectChangedMarker,
+      boolean isPRQueryNode) {
     int bitstart = 0;
     for (Aggregator aggregator : aggregators) {
       if (isStruct) {
         int pos = this.aggregateColsPos.nextSetBit(bitstart);
         bitstart = pos + 1;
         Struct struct = (Struct) current;
-        Object scalar = PDXUtils.convertPDX(struct.getFieldValues()[pos], false, true, true, true, objectChangedMarker, isStruct);
-        
-        aggregator.accumulate(scalar);
+        Object scalar = PDXUtils.convertPDX(struct.getFieldValues()[pos], false, true, true, 
+            true, objectChangedMarker, isStruct);
+        if(isPRQueryNode) {
+          aggregator.merge((Aggregator)scalar);
+        }else {
+          aggregator.accumulate(scalar);
+        }
       } else {
         current =   PDXUtils.convertPDX(current, false, true, true, true, objectChangedMarker, isStruct);
-        aggregator.accumulate(current);
+        if(isPRQueryNode) {
+          aggregator.merge((Aggregator)current);
+        }else {
+          aggregator.accumulate(current);
+        }
       }
     }
   }
@@ -503,7 +514,7 @@ public class CompiledGroupBySelect extends CompiledSelect {
           }
         }
 
-        // the grpup by expr is not an alias check for path
+        // the group by expr is not an alias check for path
         StringBuffer groupByExprBuffer = new StringBuffer();
         grpBy.generateCanonicalizedExpression(groupByExprBuffer, context);
         final String grpByExprStr = groupByExprBuffer.toString();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/DefaultQueryService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/DefaultQueryService.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/DefaultQueryService.java
index 392cb9d..b1fde78 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/DefaultQueryService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/DefaultQueryService.java
@@ -56,6 +56,8 @@ import com.gemstone.gemfire.cache.query.QueryInvalidException;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
+import com.gemstone.gemfire.cache.query.internal.aggregate.uda.UDAManager;
 import com.gemstone.gemfire.cache.query.internal.cq.ClientCQ;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
@@ -67,6 +69,7 @@ import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
 import com.gemstone.gemfire.cache.query.internal.index.PartitionedIndex;
 import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
 import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.InternalCache;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
@@ -997,5 +1000,16 @@ public class DefaultQueryService implements QueryService {
   public InternalPool getPool() {
     return pool;
   }
+
+  @Override
+  public void createUDA(String udaName, String udaClass) throws UDAExistsException, NameResolutionException {
+    ((GemFireCacheImpl)this.cache).getUDAManager().createUDA(udaName, udaClass);
+    
+  }
+
+  @Override
+  public void removeUDA(String udaName) {
+    ((GemFireCacheImpl)this.cache).getUDAManager().removeUDA(udaName);
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/ProxyQueryService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/ProxyQueryService.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/ProxyQueryService.java
index 60ab9b4..9ffea02 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/ProxyQueryService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/ProxyQueryService.java
@@ -43,6 +43,7 @@ import com.gemstone.gemfire.cache.query.Query;
 import com.gemstone.gemfire.cache.query.QueryInvalidException;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.UDAExistsException;
 import com.gemstone.gemfire.cache.query.internal.cq.ClientCQ;
 import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -448,4 +449,15 @@ public class ProxyQueryService implements QueryService {
     UserAttributes.userAttributes.set(null);
   }
 
+  @Override
+  public void createUDA(String udaName, String udaClass) throws UDAExistsException {
+    throw new UnsupportedOperationException("UDA creation on server is not supported from the client");
+  }
+
+  @Override
+  public void removeUDA(String udaName) {
+    throw new UnsupportedOperationException("UDA removal on server is not supported from the client");
+    
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/QCompiler.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/QCompiler.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/QCompiler.java
index b43d082..a6872a1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/QCompiler.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/QCompiler.java
@@ -316,6 +316,10 @@ public class QCompiler implements OQLLexerTokenTypes {
     push (new CompiledAggregateFunction(expr, aggFuncType, distinctOnly));
   }
   
+  public void uda (CompiledValue expr,int aggFuncType, String name) {
+    push (new CompiledUDAFunction(expr, aggFuncType, name ));
+  }
+  
   public void iteratorDef () {
     // find type id  and colln on the stack
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AbstractAggregator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AbstractAggregator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AbstractAggregator.java
index 0d56a3a..efd224f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AbstractAggregator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AbstractAggregator.java
@@ -16,6 +16,8 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
+import java.math.BigDecimal;
+
 import com.gemstone.gemfire.cache.query.Aggregator;
 
 /**
@@ -24,23 +26,20 @@ import com.gemstone.gemfire.cache.query.Aggregator;
  *
  */
 public abstract class AbstractAggregator implements Aggregator {
-
   public static Number downCast(double value) {
     Number retVal;
-    if (value % 1 == 0) {
-      long longValue = (long) value;
-      if (longValue <= Integer.MAX_VALUE && longValue >= Integer.MIN_VALUE) {
-        retVal = Integer.valueOf((int) longValue);
+    BigDecimal db = new BigDecimal(value);
+    try {
+      long val = db.longValueExact();
+      if (val <= Integer.MAX_VALUE && val >= Integer.MIN_VALUE) {
+        retVal = Integer.valueOf((int) val);
       } else {
-        retVal = Long.valueOf(longValue);
-      }
-    } else {
-      if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
-        retVal = Float.valueOf((float) value);
-      } else {
-        retVal = Double.valueOf(value);
+        retVal = Long.valueOf(val);
       }
+    } catch (ArithmeticException se) {
+      retVal = Double.valueOf(value);
     }
+
     return retVal;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Avg.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Avg.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Avg.java
index 7a0f00a..4a76288 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Avg.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Avg.java
@@ -16,6 +16,12 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.QueryService;
 
 /**
@@ -26,6 +32,9 @@ import com.gemstone.gemfire.cache.query.QueryService;
 public class Avg extends Sum {
   private int num = 0;
 
+  public Avg() {
+  }
+
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -36,7 +45,6 @@ public class Avg extends Sum {
 
   @Override
   public void init() {
-
   }
 
   @Override
@@ -46,4 +54,28 @@ public class Avg extends Sum {
     return downCast(result);
   }
 
+  @Override
+  public void merge(Aggregator aggregator) {
+    Avg avg = (Avg) aggregator;
+    this.num += avg.num;
+    super.merge(aggregator);
+  }
+
+  @Override
+  public int getDSFID() {
+    return AGG_FUNC_AVG;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writePrimitiveInt(this.num, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.num = DataSerializer.readPrimitiveInt(in);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinct.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinct.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinct.java
index 4548731..0613a9a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinct.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/AvgDistinct.java
@@ -25,6 +25,9 @@ import com.gemstone.gemfire.cache.query.QueryService;
  */
 public class AvgDistinct extends SumDistinct {
 
+  public AvgDistinct() {
+  }
+
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -38,5 +41,4 @@ public class AvgDistinct extends SumDistinct {
     double result = sum / this.distinct.size();
     return downCast(result);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Count.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Count.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Count.java
index 8992150..279b3cd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Count.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Count.java
@@ -16,8 +16,15 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
 
 /**
  * Computes the count of the non distinct rows for replicated & PR based
@@ -25,9 +32,12 @@ import com.gemstone.gemfire.cache.query.QueryService;
  * 
  *
  */
-public class Count implements Aggregator {
-  private int count = 0;
+public class Count extends AbstractAggregator implements DataSerializableFixedID {
+  private long count = 0;
 
+  public Count(){    
+  }
+  
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -37,12 +47,35 @@ public class Count implements Aggregator {
 
   @Override
   public void init() {
-
   }
 
   @Override
   public Object terminate() {
-    return Integer.valueOf(count);
+    return downCast(count);
+  }
+  
+  @Override 
+  public void merge(Aggregator countAgg) {
+    this.count += ((Count)countAgg).count;
   }
 
+  @Override
+  public Version[] getSerializationVersions() {   
+    return null;
+  }
+
+  @Override
+  public int getDSFID() {    
+    return AGG_FUNC_COUNT;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writePrimitiveLong(this.count, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {    
+    this.count = DataSerializer.readPrimitiveLong(in);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinct.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinct.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinct.java
index c878a24..2ac4f91 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinct.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/CountDistinct.java
@@ -24,9 +24,11 @@ package com.gemstone.gemfire.cache.query.internal.aggregate;
 
 public class CountDistinct extends DistinctAggregator {
 
+  public CountDistinct() {
+  }
+
   @Override
   public Object terminate() {
     return Integer.valueOf(this.distinct.size());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/DistinctAggregator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/DistinctAggregator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/DistinctAggregator.java
index 2720897..fdccbb5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/DistinctAggregator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/DistinctAggregator.java
@@ -16,19 +16,26 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.HashSet;
-import java.util.Set;
 
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
 
 /**
  * The class used to hold the distinct values. This will get instantiated on the
  * bucket node as part of distinct queries for sum, count, average.
  * 
+ * 
  *
  */
-public class DistinctAggregator extends AbstractAggregator {
-  protected final Set<Object> distinct;
+public class DistinctAggregator extends AbstractAggregator implements DataSerializableFixedID {
+  protected HashSet<Object> distinct;
 
   public DistinctAggregator() {
     this.distinct = new HashSet<Object>();
@@ -42,14 +49,35 @@ public class DistinctAggregator extends AbstractAggregator {
   }
 
   @Override
-  public void init() {
-    // TODO Auto-generated method stub
-
-  }
+  public void init() {}
 
   @Override
   public Object terminate() {
     return this.distinct;
   }
 
+  @Override
+  public void merge(Aggregator otherAgg) {
+    this.distinct.addAll(((DistinctAggregator) otherAgg).distinct);
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+  @Override
+  public int getDSFID() {
+    return AGG_FUNC_DISTINCT_AGG;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashSet(this.distinct, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.distinct = DataSerializer.readHashSet(in);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/MaxMin.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/MaxMin.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/MaxMin.java
index b643c9c..c74ba78 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/MaxMin.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/MaxMin.java
@@ -16,8 +16,15 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
 
 /**
  * Computes the Max or Min
@@ -25,13 +32,17 @@ import com.gemstone.gemfire.cache.query.QueryService;
  *
  */
 
-public class MaxMin implements Aggregator {
-  private final boolean findMax;
+public class MaxMin implements Aggregator, DataSerializableFixedID  {
+  private final boolean findMax; 
   private Comparable currentOptima;
 
   public MaxMin(boolean findMax) {
     this.findMax = findMax;
   }
+  
+  public MaxMin() {
+    this.findMax = false;
+  }
 
   @Override
   public void accumulate(Object value) {
@@ -63,5 +74,29 @@ public class MaxMin implements Aggregator {
   public Object terminate() {
     return currentOptima;
   }
+  
+  @Override
+  public void merge(Aggregator maxMin) {
+    this.accumulate( ((MaxMin)maxMin).currentOptima);
+  }
 
+  @Override
+  public Version[] getSerializationVersions() {   
+    return null;
+  }
+
+  @Override
+  public int getDSFID() {
+    return AGG_FUNC_MAX_MIN;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {    
+    DataSerializer.writeObject(this.currentOptima, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.currentOptima = DataSerializer.readObject(in);    
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Sum.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Sum.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Sum.java
index 96f80b8..bd813ed 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Sum.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/Sum.java
@@ -16,17 +16,29 @@
  */
 package com.gemstone.gemfire.cache.query.internal.aggregate;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.query.Aggregator;
 import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
 
 /**
  * Computes the sum for replicated & PR based queries.
  * 
  *
  */
-public class Sum extends AbstractAggregator {
+public class Sum extends AbstractAggregator implements DataSerializableFixedID{
 
   private double result = 0;
 
+  
+  public Sum() {   
+  }
+  
   @Override
   public void accumulate(Object value) {
     if (value != null && value != QueryService.UNDEFINED) {
@@ -44,4 +56,30 @@ public class Sum extends AbstractAggregator {
   public Object terminate() {
     return downCast(result);
   }
+  
+  @Override
+  public void merge(Aggregator aggregator) {
+    Sum sumAgg = (Sum)aggregator;
+    this.result += sumAgg.result; 
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {   
+    return null;
+  }
+
+  @Override
+  public int getDSFID() {    
+    return AGG_FUNC_SUM;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writePrimitiveDouble(this.result, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {    
+    this.result = DataSerializer.readPrimitiveDouble(in);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinct.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinct.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinct.java
index 57c2a9a..3f765f7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinct.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/aggregate/SumDistinct.java
@@ -22,6 +22,9 @@ package com.gemstone.gemfire.cache.query.internal.aggregate;
  */
 public class SumDistinct extends DistinctAggregator {
 
+  public SumDistinct() {   
+  }
+  
   @Override
   public Object terminate() {
     double sum = 0;
@@ -29,6 +32,5 @@ public class SumDistinct extends DistinctAggregator {
       sum += ((Number) o).doubleValue();
     }
     return downCast(sum);
-  }
-
+  }   
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/ASTAggregateFunc.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/ASTAggregateFunc.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/ASTAggregateFunc.java
index 8c989f9..91202ee 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/ASTAggregateFunc.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/ASTAggregateFunc.java
@@ -30,7 +30,7 @@ public class ASTAggregateFunc extends GemFireAST {
   private static final long serialVersionUID = 8713004765228379685L;
   private int  aggFunctionType;
   private boolean  distinctOnly = false;
-  
+  private String udaName;
   
   public ASTAggregateFunc() { 
     
@@ -45,6 +45,10 @@ public class ASTAggregateFunc extends GemFireAST {
     this.aggFunctionType = type;
   }
   
+  public void setUDAName(String name) {
+    this.udaName = name;
+  }
+  
   public void setDistinctOnly(boolean distinctOnly) {
     this.distinctOnly = distinctOnly;
   }
@@ -60,6 +64,10 @@ public class ASTAggregateFunc extends GemFireAST {
         throw new QueryInvalidException("invalid parameter to aggregate function");
       }
     }
-    compiler.aggregateFunction((CompiledValue)expr, this.aggFunctionType, this.distinctOnly);
+    if(this.aggFunctionType == OQLLexerTokenTypes.UDA) {
+      compiler.uda((CompiledValue)expr,this.aggFunctionType,this.getText() );
+    }else {
+      compiler.aggregateFunction((CompiledValue)expr, this.aggFunctionType, this.distinctOnly);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexer.java
index 2ada98d..afb1fb2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexer.java
@@ -1,4 +1,4 @@
-// $ANTLR 2.7.4: "oql.g" -> "OQLLexer.java"$
+// $ANTLR 2.7.7 (20060906): "oql.g" -> "OQLLexer.java"$
 
 package com.gemstone.gemfire.cache.query.internal.parse;
 import java.util.*;
@@ -58,86 +58,86 @@ public OQLLexer(LexerSharedInputState state) {
 	caseSensitiveLiterals = false;
 	setCaseSensitive(false);
 	literals = new Hashtable();
-	literals.put(new ANTLRHashString("type", this), new Integer(78));
-	literals.put(new ANTLRHashString("byte", this), new Integer(131));
-	literals.put(new ANTLRHashString("list", this), new Integer(122));
-	literals.put(new ANTLRHashString("undefine", this), new Integer(72));
-	literals.put(new ANTLRHashString("time", this), new Integer(135));
-	literals.put(new ANTLRHashString("short", this), new Integer(123));
-	literals.put(new ANTLRHashString("dictionary", this), new Integer(139));
-	literals.put(new ANTLRHashString("listtoset", this), new Integer(103));
-	literals.put(new ANTLRHashString("abs", this), new Integer(101));
-	literals.put(new ANTLRHashString("timestamp", this), new Integer(137));
-	literals.put(new ANTLRHashString("limit", this), new Integer(80));
-	literals.put(new ANTLRHashString("distinct", this), new Integer(74));
-	literals.put(new ANTLRHashString("octet", this), new Integer(132));
-	literals.put(new ANTLRHashString("where", this), new Integer(79));
-	literals.put(new ANTLRHashString("orelse", this), new Integer(89));
-	literals.put(new ANTLRHashString("select", this), new Integer(73));
-	literals.put(new ANTLRHashString("and", this), new Integer(90));
-	literals.put(new ANTLRHashString("float", this), new Integer(126));
-	literals.put(new ANTLRHashString("not", this), new Integer(102));
-	literals.put(new ANTLRHashString("interval", this), new Integer(136));
-	literals.put(new ANTLRHashString("date", this), new Integer(134));
-	literals.put(new ANTLRHashString("from", this), new Integer(76));
-	literals.put(new ANTLRHashString("null", this), new Integer(142));
-	literals.put(new ANTLRHashString("flatten", this), new Integer(105));
-	literals.put(new ANTLRHashString("count", this), new Integer(115));
-	literals.put(new ANTLRHashString("last", this), new Integer(109));
-	literals.put(new ANTLRHashString("query", this), new Integer(71));
-	literals.put(new ANTLRHashString("mod", this), new Integer(99));
-	literals.put(new ANTLRHashString("trace", this), new Integer(66));
-	literals.put(new ANTLRHashString("nvl", this), new Integer(106));
-	literals.put(new ANTLRHashString("like", this), new Integer(96));
-	literals.put(new ANTLRHashString("except", this), new Integer(98));
-	literals.put(new ANTLRHashString("set", this), new Integer(120));
-	literals.put(new ANTLRHashString("to_date", this), new Integer(107));
-	literals.put(new ANTLRHashString("intersect", this), new Integer(100));
-	literals.put(new ANTLRHashString("map", this), new Integer(140));
-	literals.put(new ANTLRHashString("array", this), new Integer(119));
-	literals.put(new ANTLRHashString("or", this), new Integer(88));
-	literals.put(new ANTLRHashString("any", this), new Integer(94));
-	literals.put(new ANTLRHashString("double", this), new Integer(127));
-	literals.put(new ANTLRHashString("min", this), new Integer(113));
-	literals.put(new ANTLRHashString("as", this), new Integer(68));
-	literals.put(new ANTLRHashString("first", this), new Integer(108));
-	literals.put(new ANTLRHashString("by", this), new Integer(82));
-	literals.put(new ANTLRHashString("all", this), new Integer(75));
-	literals.put(new ANTLRHashString("union", this), new Integer(97));
-	literals.put(new ANTLRHashString("order", this), new Integer(85));
-	literals.put(new ANTLRHashString("is_defined", this), new Integer(117));
-	literals.put(new ANTLRHashString("collection", this), new Integer(138));
-	literals.put(new ANTLRHashString("some", this), new Integer(95));
-	literals.put(new ANTLRHashString("enum", this), new Integer(133));
-	literals.put(new ANTLRHashString("declare", this), new Integer(69));
-	literals.put(new ANTLRHashString("int", this), new Integer(125));
-	literals.put(new ANTLRHashString("for", this), new Integer(91));
-	literals.put(new ANTLRHashString("is_undefined", this), new Integer(116));
-	literals.put(new ANTLRHashString("boolean", this), new Integer(130));
-	literals.put(new ANTLRHashString("char", this), new Integer(128));
-	literals.put(new ANTLRHashString("define", this), new Integer(70));
-	literals.put(new ANTLRHashString("element", this), new Integer(104));
-	literals.put(new ANTLRHashString("string", this), new Integer(129));
-	literals.put(new ANTLRHashString("hint", this), new Integer(84));
-	literals.put(new ANTLRHashString("false", this), new Integer(145));
-	literals.put(new ANTLRHashString("exists", this), new Integer(92));
-	literals.put(new ANTLRHashString("asc", this), new Integer(86));
-	literals.put(new ANTLRHashString("undefined", this), new Integer(143));
-	literals.put(new ANTLRHashString("desc", this), new Integer(87));
-	literals.put(new ANTLRHashString("bag", this), new Integer(121));
-	literals.put(new ANTLRHashString("max", this), new Integer(114));
-	literals.put(new ANTLRHashString("sum", this), new Integer(111));
-	literals.put(new ANTLRHashString("struct", this), new Integer(118));
-	literals.put(new ANTLRHashString("import", this), new Integer(67));
-	literals.put(new ANTLRHashString("in", this), new Integer(77));
-	literals.put(new ANTLRHashString("avg", this), new Integer(112));
-	literals.put(new ANTLRHashString("true", this), new Integer(144));
-	literals.put(new ANTLRHashString("long", this), new Integer(124));
-	literals.put(new ANTLRHashString("nil", this), new Integer(141));
-	literals.put(new ANTLRHashString("group", this), new Integer(81));
-	literals.put(new ANTLRHashString("having", this), new Integer(83));
-	literals.put(new ANTLRHashString("unique", this), new Integer(110));
-	literals.put(new ANTLRHashString("andthen", this), new Integer(93));
+	literals.put(new ANTLRHashString("type", this), new Integer(79));
+	literals.put(new ANTLRHashString("byte", this), new Integer(132));
+	literals.put(new ANTLRHashString("list", this), new Integer(123));
+	literals.put(new ANTLRHashString("undefine", this), new Integer(73));
+	literals.put(new ANTLRHashString("time", this), new Integer(136));
+	literals.put(new ANTLRHashString("short", this), new Integer(124));
+	literals.put(new ANTLRHashString("dictionary", this), new Integer(140));
+	literals.put(new ANTLRHashString("listtoset", this), new Integer(104));
+	literals.put(new ANTLRHashString("abs", this), new Integer(102));
+	literals.put(new ANTLRHashString("timestamp", this), new Integer(138));
+	literals.put(new ANTLRHashString("limit", this), new Integer(81));
+	literals.put(new ANTLRHashString("distinct", this), new Integer(75));
+	literals.put(new ANTLRHashString("octet", this), new Integer(133));
+	literals.put(new ANTLRHashString("where", this), new Integer(80));
+	literals.put(new ANTLRHashString("orelse", this), new Integer(90));
+	literals.put(new ANTLRHashString("select", this), new Integer(74));
+	literals.put(new ANTLRHashString("and", this), new Integer(91));
+	literals.put(new ANTLRHashString("float", this), new Integer(127));
+	literals.put(new ANTLRHashString("not", this), new Integer(103));
+	literals.put(new ANTLRHashString("interval", this), new Integer(137));
+	literals.put(new ANTLRHashString("date", this), new Integer(135));
+	literals.put(new ANTLRHashString("from", this), new Integer(77));
+	literals.put(new ANTLRHashString("null", this), new Integer(143));
+	literals.put(new ANTLRHashString("flatten", this), new Integer(106));
+	literals.put(new ANTLRHashString("count", this), new Integer(116));
+	literals.put(new ANTLRHashString("last", this), new Integer(110));
+	literals.put(new ANTLRHashString("query", this), new Integer(72));
+	literals.put(new ANTLRHashString("mod", this), new Integer(100));
+	literals.put(new ANTLRHashString("trace", this), new Integer(67));
+	literals.put(new ANTLRHashString("nvl", this), new Integer(107));
+	literals.put(new ANTLRHashString("like", this), new Integer(97));
+	literals.put(new ANTLRHashString("except", this), new Integer(99));
+	literals.put(new ANTLRHashString("set", this), new Integer(121));
+	literals.put(new ANTLRHashString("to_date", this), new Integer(108));
+	literals.put(new ANTLRHashString("intersect", this), new Integer(101));
+	literals.put(new ANTLRHashString("map", this), new Integer(141));
+	literals.put(new ANTLRHashString("array", this), new Integer(120));
+	literals.put(new ANTLRHashString("or", this), new Integer(89));
+	literals.put(new ANTLRHashString("any", this), new Integer(95));
+	literals.put(new ANTLRHashString("double", this), new Integer(128));
+	literals.put(new ANTLRHashString("min", this), new Integer(114));
+	literals.put(new ANTLRHashString("as", this), new Integer(69));
+	literals.put(new ANTLRHashString("first", this), new Integer(109));
+	literals.put(new ANTLRHashString("by", this), new Integer(83));
+	literals.put(new ANTLRHashString("all", this), new Integer(76));
+	literals.put(new ANTLRHashString("union", this), new Integer(98));
+	literals.put(new ANTLRHashString("order", this), new Integer(86));
+	literals.put(new ANTLRHashString("is_defined", this), new Integer(118));
+	literals.put(new ANTLRHashString("collection", this), new Integer(139));
+	literals.put(new ANTLRHashString("some", this), new Integer(96));
+	literals.put(new ANTLRHashString("enum", this), new Integer(134));
+	literals.put(new ANTLRHashString("declare", this), new Integer(70));
+	literals.put(new ANTLRHashString("int", this), new Integer(126));
+	literals.put(new ANTLRHashString("for", this), new Integer(92));
+	literals.put(new ANTLRHashString("is_undefined", this), new Integer(117));
+	literals.put(new ANTLRHashString("boolean", this), new Integer(131));
+	literals.put(new ANTLRHashString("char", this), new Integer(129));
+	literals.put(new ANTLRHashString("define", this), new Integer(71));
+	literals.put(new ANTLRHashString("element", this), new Integer(105));
+	literals.put(new ANTLRHashString("string", this), new Integer(130));
+	literals.put(new ANTLRHashString("hint", this), new Integer(85));
+	literals.put(new ANTLRHashString("false", this), new Integer(146));
+	literals.put(new ANTLRHashString("exists", this), new Integer(93));
+	literals.put(new ANTLRHashString("asc", this), new Integer(87));
+	literals.put(new ANTLRHashString("undefined", this), new Integer(144));
+	literals.put(new ANTLRHashString("desc", this), new Integer(88));
+	literals.put(new ANTLRHashString("bag", this), new Integer(122));
+	literals.put(new ANTLRHashString("max", this), new Integer(115));
+	literals.put(new ANTLRHashString("sum", this), new Integer(112));
+	literals.put(new ANTLRHashString("struct", this), new Integer(119));
+	literals.put(new ANTLRHashString("import", this), new Integer(68));
+	literals.put(new ANTLRHashString("in", this), new Integer(78));
+	literals.put(new ANTLRHashString("avg", this), new Integer(113));
+	literals.put(new ANTLRHashString("true", this), new Integer(145));
+	literals.put(new ANTLRHashString("long", this), new Integer(125));
+	literals.put(new ANTLRHashString("nil", this), new Integer(142));
+	literals.put(new ANTLRHashString("group", this), new Integer(82));
+	literals.put(new ANTLRHashString("having", this), new Integer(84));
+	literals.put(new ANTLRHashString("unique", this), new Integer(111));
+	literals.put(new ANTLRHashString("andthen", this), new Integer(94));
 }
 
 public Token nextToken() throws TokenStreamException {
@@ -1607,7 +1607,7 @@ tryAgain:
 						synPredMatched61 = false;
 					}
 					rewind(_m61);
-					inputState.guessing--;
+inputState.guessing--;
 				}
 				if ( synPredMatched61 ) {
 					{

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.java
index f30f3c0..7e632b2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.java
@@ -1,4 +1,4 @@
-// $ANTLR 2.7.4: "oql.g" -> "OQLParser.java"$
+// $ANTLR 2.7.7 (20060906): "oql.g" -> "OQLParser.java"$
 
 package com.gemstone.gemfire.cache.query.internal.parse;
 import java.util.*;
@@ -69,87 +69,88 @@ public interface OQLLexerTokenTypes {
 	int COUNT = 63;
 	int MAX = 64;
 	int MIN = 65;
-	int LITERAL_trace = 66;
-	int LITERAL_import = 67;
-	int LITERAL_as = 68;
-	int LITERAL_declare = 69;
-	int LITERAL_define = 70;
-	int LITERAL_query = 71;
-	int LITERAL_undefine = 72;
-	int LITERAL_select = 73;
-	int LITERAL_distinct = 74;
-	int LITERAL_all = 75;
-	int LITERAL_from = 76;
-	int LITERAL_in = 77;
-	int LITERAL_type = 78;
-	int LITERAL_where = 79;
-	int LITERAL_limit = 80;
-	int LITERAL_group = 81;
-	int LITERAL_by = 82;
-	int LITERAL_having = 83;
-	int LITERAL_hint = 84;
-	int LITERAL_order = 85;
-	int LITERAL_asc = 86;
-	int LITERAL_desc = 87;
-	int LITERAL_or = 88;
-	int LITERAL_orelse = 89;
-	int LITERAL_and = 90;
-	int LITERAL_for = 91;
-	int LITERAL_exists = 92;
-	int LITERAL_andthen = 93;
-	int LITERAL_any = 94;
-	int LITERAL_some = 95;
-	int LITERAL_like = 96;
-	int LITERAL_union = 97;
-	int LITERAL_except = 98;
-	int LITERAL_mod = 99;
-	int LITERAL_intersect = 100;
-	int LITERAL_abs = 101;
-	int LITERAL_not = 102;
-	int LITERAL_listtoset = 103;
-	int LITERAL_element = 104;
-	int LITERAL_flatten = 105;
-	int LITERAL_nvl = 106;
-	int LITERAL_to_date = 107;
-	int LITERAL_first = 108;
-	int LITERAL_last = 109;
-	int LITERAL_unique = 110;
-	int LITERAL_sum = 111;
-	int LITERAL_avg = 112;
-	int LITERAL_min = 113;
-	int LITERAL_max = 114;
-	int LITERAL_count = 115;
-	int LITERAL_is_undefined = 116;
-	int LITERAL_is_defined = 117;
-	int LITERAL_struct = 118;
-	int LITERAL_array = 119;
-	int LITERAL_set = 120;
-	int LITERAL_bag = 121;
-	int LITERAL_list = 122;
-	int LITERAL_short = 123;
-	int LITERAL_long = 124;
-	int LITERAL_int = 125;
-	int LITERAL_float = 126;
-	int LITERAL_double = 127;
-	int LITERAL_char = 128;
-	int LITERAL_string = 129;
-	int LITERAL_boolean = 130;
-	int LITERAL_byte = 131;
-	int LITERAL_octet = 132;
-	int LITERAL_enum = 133;
-	int LITERAL_date = 134;
-	int LITERAL_time = 135;
-	int LITERAL_interval = 136;
-	int LITERAL_timestamp = 137;
-	int LITERAL_collection = 138;
-	int LITERAL_dictionary = 139;
-	int LITERAL_map = 140;
-	int LITERAL_nil = 141;
-	int LITERAL_null = 142;
-	int LITERAL_undefined = 143;
-	int LITERAL_true = 144;
-	int LITERAL_false = 145;
-	int NUM_LONG = 146;
-	int NUM_FLOAT = 147;
-	int NUM_DOUBLE = 148;
+	int UDA = 66;
+	int LITERAL_trace = 67;
+	int LITERAL_import = 68;
+	int LITERAL_as = 69;
+	int LITERAL_declare = 70;
+	int LITERAL_define = 71;
+	int LITERAL_query = 72;
+	int LITERAL_undefine = 73;
+	int LITERAL_select = 74;
+	int LITERAL_distinct = 75;
+	int LITERAL_all = 76;
+	int LITERAL_from = 77;
+	int LITERAL_in = 78;
+	int LITERAL_type = 79;
+	int LITERAL_where = 80;
+	int LITERAL_limit = 81;
+	int LITERAL_group = 82;
+	int LITERAL_by = 83;
+	int LITERAL_having = 84;
+	int LITERAL_hint = 85;
+	int LITERAL_order = 86;
+	int LITERAL_asc = 87;
+	int LITERAL_desc = 88;
+	int LITERAL_or = 89;
+	int LITERAL_orelse = 90;
+	int LITERAL_and = 91;
+	int LITERAL_for = 92;
+	int LITERAL_exists = 93;
+	int LITERAL_andthen = 94;
+	int LITERAL_any = 95;
+	int LITERAL_some = 96;
+	int LITERAL_like = 97;
+	int LITERAL_union = 98;
+	int LITERAL_except = 99;
+	int LITERAL_mod = 100;
+	int LITERAL_intersect = 101;
+	int LITERAL_abs = 102;
+	int LITERAL_not = 103;
+	int LITERAL_listtoset = 104;
+	int LITERAL_element = 105;
+	int LITERAL_flatten = 106;
+	int LITERAL_nvl = 107;
+	int LITERAL_to_date = 108;
+	int LITERAL_first = 109;
+	int LITERAL_last = 110;
+	int LITERAL_unique = 111;
+	int LITERAL_sum = 112;
+	int LITERAL_avg = 113;
+	int LITERAL_min = 114;
+	int LITERAL_max = 115;
+	int LITERAL_count = 116;
+	int LITERAL_is_undefined = 117;
+	int LITERAL_is_defined = 118;
+	int LITERAL_struct = 119;
+	int LITERAL_array = 120;
+	int LITERAL_set = 121;
+	int LITERAL_bag = 122;
+	int LITERAL_list = 123;
+	int LITERAL_short = 124;
+	int LITERAL_long = 125;
+	int LITERAL_int = 126;
+	int LITERAL_float = 127;
+	int LITERAL_double = 128;
+	int LITERAL_char = 129;
+	int LITERAL_string = 130;
+	int LITERAL_boolean = 131;
+	int LITERAL_byte = 132;
+	int LITERAL_octet = 133;
+	int LITERAL_enum = 134;
+	int LITERAL_date = 135;
+	int LITERAL_time = 136;
+	int LITERAL_interval = 137;
+	int LITERAL_timestamp = 138;
+	int LITERAL_collection = 139;
+	int LITERAL_dictionary = 140;
+	int LITERAL_map = 141;
+	int LITERAL_nil = 142;
+	int LITERAL_null = 143;
+	int LITERAL_undefined = 144;
+	int LITERAL_true = 145;
+	int LITERAL_false = 146;
+	int NUM_LONG = 147;
+	int NUM_FLOAT = 148;
+	int NUM_DOUBLE = 149;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f85cac9/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.txt
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.txt b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.txt
index 8c8fda5..44c262b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.txt
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/query/internal/parse/OQLLexerTokenTypes.txt
@@ -1,4 +1,4 @@
-// $ANTLR 2.7.4: oql.g -> OQLLexerTokenTypes.txt$
+// $ANTLR 2.7.7 (20060906): oql.g -> OQLLexerTokenTypes.txt$
 OQLLexer    // output token vocab name
 TOK_RPAREN=4
 TOK_LPAREN=5
@@ -62,86 +62,87 @@ AVG=62
 COUNT=63
 MAX=64
 MIN=65
-LITERAL_trace="trace"=66
-LITERAL_import="import"=67
-LITERAL_as="as"=68
-LITERAL_declare="declare"=69
-LITERAL_define="define"=70
-LITERAL_query="query"=71
-LITERAL_undefine="undefine"=72
-LITERAL_select="select"=73
-LITERAL_distinct="distinct"=74
-LITERAL_all="all"=75
-LITERAL_from="from"=76
-LITERAL_in="in"=77
-LITERAL_type="type"=78
-LITERAL_where="where"=79
-LITERAL_limit="limit"=80
-LITERAL_group="group"=81
-LITERAL_by="by"=82
-LITERAL_having="having"=83
-LITERAL_hint="hint"=84
-LITERAL_order="order"=85
-LITERAL_asc="asc"=86
-LITERAL_desc="desc"=87
-LITERAL_or="or"=88
-LITERAL_orelse="orelse"=89
-LITERAL_and="and"=90
-LITERAL_for="for"=91
-LITERAL_exists="exists"=92
-LITERAL_andthen="andthen"=93
-LITERAL_any="any"=94
-LITERAL_some="some"=95
-LITERAL_like="like"=96
-LITERAL_union="union"=97
-LITERAL_except="except"=98
-LITERAL_mod="mod"=99
-LITERAL_intersect="intersect"=100
-LITERAL_abs="abs"=101
-LITERAL_not="not"=102
-LITERAL_listtoset="listtoset"=103
-LITERAL_element="element"=104
-LITERAL_flatten="flatten"=105
-LITERAL_nvl="nvl"=106
-LITERAL_to_date="to_date"=107
-LITERAL_first="first"=108
-LITERAL_last="last"=109
-LITERAL_unique="unique"=110
-LITERAL_sum="sum"=111
-LITERAL_avg="avg"=112
-LITERAL_min="min"=113
-LITERAL_max="max"=114
-LITERAL_count="count"=115
-LITERAL_is_undefined="is_undefined"=116
-LITERAL_is_defined="is_defined"=117
-LITERAL_struct="struct"=118
-LITERAL_array="array"=119
-LITERAL_set="set"=120
-LITERAL_bag="bag"=121
-LITERAL_list="list"=122
-LITERAL_short="short"=123
-LITERAL_long="long"=124
-LITERAL_int="int"=125
-LITERAL_float="float"=126
-LITERAL_double="double"=127
-LITERAL_char="char"=128
-LITERAL_string="string"=129
-LITERAL_boolean="boolean"=130
-LITERAL_byte="byte"=131
-LITERAL_octet="octet"=132
-LITERAL_enum="enum"=133
-LITERAL_date="date"=134
-LITERAL_time="time"=135
-LITERAL_interval="interval"=136
-LITERAL_timestamp="timestamp"=137
-LITERAL_collection="collection"=138
-LITERAL_dictionary="dictionary"=139
-LITERAL_map="map"=140
-LITERAL_nil="nil"=141
-LITERAL_null="null"=142
-LITERAL_undefined="undefined"=143
-LITERAL_true="true"=144
-LITERAL_false="false"=145
-NUM_LONG=146
-NUM_FLOAT=147
-NUM_DOUBLE=148
+UDA=66
+LITERAL_trace="trace"=67
+LITERAL_import="import"=68
+LITERAL_as="as"=69
+LITERAL_declare="declare"=70
+LITERAL_define="define"=71
+LITERAL_query="query"=72
+LITERAL_undefine="undefine"=73
+LITERAL_select="select"=74
+LITERAL_distinct="distinct"=75
+LITERAL_all="all"=76
+LITERAL_from="from"=77
+LITERAL_in="in"=78
+LITERAL_type="type"=79
+LITERAL_where="where"=80
+LITERAL_limit="limit"=81
+LITERAL_group="group"=82
+LITERAL_by="by"=83
+LITERAL_having="having"=84
+LITERAL_hint="hint"=85
+LITERAL_order="order"=86
+LITERAL_asc="asc"=87
+LITERAL_desc="desc"=88
+LITERAL_or="or"=89
+LITERAL_orelse="orelse"=90
+LITERAL_and="and"=91
+LITERAL_for="for"=92
+LITERAL_exists="exists"=93
+LITERAL_andthen="andthen"=94
+LITERAL_any="any"=95
+LITERAL_some="some"=96
+LITERAL_like="like"=97
+LITERAL_union="union"=98
+LITERAL_except="except"=99
+LITERAL_mod="mod"=100
+LITERAL_intersect="intersect"=101
+LITERAL_abs="abs"=102
+LITERAL_not="not"=103
+LITERAL_listtoset="listtoset"=104
+LITERAL_element="element"=105
+LITERAL_flatten="flatten"=106
+LITERAL_nvl="nvl"=107
+LITERAL_to_date="to_date"=108
+LITERAL_first="first"=109
+LITERAL_last="last"=110
+LITERAL_unique="unique"=111
+LITERAL_sum="sum"=112
+LITERAL_avg="avg"=113
+LITERAL_min="min"=114
+LITERAL_max="max"=115
+LITERAL_count="count"=116
+LITERAL_is_undefined="is_undefined"=117
+LITERAL_is_defined="is_defined"=118
+LITERAL_struct="struct"=119
+LITERAL_array="array"=120
+LITERAL_set="set"=121
+LITERAL_bag="bag"=122
+LITERAL_list="list"=123
+LITERAL_short="short"=124
+LITERAL_long="long"=125
+LITERAL_int="int"=126
+LITERAL_float="float"=127
+LITERAL_double="double"=128
+LITERAL_char="char"=129
+LITERAL_string="string"=130
+LITERAL_boolean="boolean"=131
+LITERAL_byte="byte"=132
+LITERAL_octet="octet"=133
+LITERAL_enum="enum"=134
+LITERAL_date="date"=135
+LITERAL_time="time"=136
+LITERAL_interval="interval"=137
+LITERAL_timestamp="timestamp"=138
+LITERAL_collection="collection"=139
+LITERAL_dictionary="dictionary"=140
+LITERAL_map="map"=141
+LITERAL_nil="nil"=142
+LITERAL_null="null"=143
+LITERAL_undefined="undefined"=144
+LITERAL_true="true"=145
+LITERAL_false="false"=146
+NUM_LONG=147
+NUM_FLOAT=148
+NUM_DOUBLE=149