You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/20 23:00:35 UTC
[06/14] incubator-geode git commit: GEODE-37 change package name from
io.pivotal.geode (for ./geode-functions/src/main/java/io/pivotal)to
org.apache.geode for(to ./geode-functions/src/main/java/org/apache)
GEODE-37 change package name from io.pivotal.geode (for ./geode-functions/src/main/java/io/pivotal)to org.apache.geode for(to ./geode-functions/src/main/java/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/54cf6bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/54cf6bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/54cf6bf5
Branch: refs/heads/develop
Commit: 54cf6bf5d826814a4cd82b100a9f638896e48945
Parents: f9a022a
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700
----------------------------------------------------------------------
.../connector/internal/RegionMetadata.java | 93 --------
.../internal/geodefunctions/QueryFunction.java | 99 ---------
.../geodefunctions/RetrieveRegionFunction.java | 208 ------------------
.../RetrieveRegionMetadataFunction.java | 118 ----------
.../StructStreamingResultSender.java | 219 -------------------
.../connector/internal/RegionMetadata.java | 93 ++++++++
.../internal/geodefunctions/QueryFunction.java | 99 +++++++++
.../geodefunctions/RetrieveRegionFunction.java | 208 ++++++++++++++++++
.../RetrieveRegionMetadataFunction.java | 118 ++++++++++
.../StructStreamingResultSender.java | 219 +++++++++++++++++++
10 files changed, 737 insertions(+), 737 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
deleted file mode 100644
index 4fee0e0..0000000
--- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal;
-
-import org.apache.geode.distributed.internal.ServerLocation;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.io.Serializable;
-
-/**
- * This class contains all info required by GemFire RDD partitioner to create partitions.
- */
-public class RegionMetadata implements Serializable {
-
- private String regionPath;
- private boolean isPartitioned;
- private int totalBuckets;
- private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
- private String keyTypeName;
- private String valueTypeName;
-
- /**
- * Default constructor.
- * @param regionPath the full path of the given region
- * @param isPartitioned true for partitioned region, false otherwise
- * @param totalBuckets number of total buckets for partitioned region, ignored otherwise
- * @param serverBucketMap geode server (host:port pair) to bucket set map
- * @param keyTypeName region key class name
- * @param valueTypeName region value class name
- */
- public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
- String keyTypeName, String valueTypeName) {
- this.regionPath = regionPath;
- this.isPartitioned = isPartitioned;
- this.totalBuckets = totalBuckets;
- this.serverBucketMap = serverBucketMap;
- this.keyTypeName = keyTypeName;
- this.valueTypeName = valueTypeName;
- }
-
- public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
- this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
- }
-
- public String getRegionPath() {
- return regionPath;
- }
-
- public boolean isPartitioned() {
- return isPartitioned;
- }
-
- public int getTotalBuckets() {
- return totalBuckets;
- }
-
- public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
- return serverBucketMap;
- }
-
- public String getKeyTypeName() {
- return keyTypeName;
- }
-
- public String getValueTypeName() {
- return valueTypeName;
- }
-
- public String toString() {
- StringBuilder buf = new StringBuilder();
- buf.append("RegionMetadata(region=").append(regionPath)
- .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")")
- .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets)
- .append(", map=").append(serverBucketMap).append(")");
- return buf.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
deleted file mode 100644
index 6e6e295..0000000
--- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.execute.*;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Query;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-import java.util.Iterator;
-
-public class QueryFunction implements Function {
-
- private static final long serialVersionUID = 4866641340803692882L;
-
- public final static String ID = "geode-spark-query-function";
-
- private final static QueryFunction instance = new QueryFunction();
-
- private static final Logger logger = LogService.getLogger();
-
- private static final int CHUNK_SIZE = 1024;
-
- @Override
- public String getId() {
- return ID;
- }
-
- public static QueryFunction getInstance() {
- return instance;
- }
-
- @Override
- public boolean optimizeForWrite() {
- return true;
- }
-
- @Override
- public boolean isHA() {
- return true;
- }
-
- @Override
- public boolean hasResult() {
- return true;
- }
-
- @Override
- public void execute(FunctionContext context) {
- try {
- String[] args = (String[]) context.getArguments();
- String queryString = args[0];
- String bucketSet = args[1];
- InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
- LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
- boolean partitioned = localRegion.getDataPolicy().withPartitioning();
- Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
- Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute();
- ResultSender<Object> sender = context.getResultSender();
- HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
- Iterator<Object> iter = ((SelectResults) result).asList().iterator();
- while (iter.hasNext()) {
- Object row = iter.next();
- DataSerializer.writeObject(row, buf);
- if (buf.size() > CHUNK_SIZE) {
- sender.sendResult(buf.toByteArray());
- logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size());
- buf.reset();
- }
- }
- sender.lastResult(buf.toByteArray());
- logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size());
- buf.reset();
- }
- catch(Exception e) {
- throw new FunctionException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
deleted file mode 100644
index d3a2572..0000000
--- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import java.util.Iterator;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.execute.FunctionException;
-import org.apache.geode.cache.query.Query;
-import org.apache.geode.cache.query.QueryService;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Struct;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
-import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
-import org.apache.geode.internal.cache.execute.InternalResultSender;
-import org.apache.geode.internal.cache.partitioned.PREntriesIterator;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * GemFire function that is used by `SparkContext.geodeRegion(regionPath, whereClause)`
- * to retrieve region data set for the given bucket set as a RDD partition
- **/
-public class RetrieveRegionFunction implements Function {
-
- public final static String ID = "spark-geode-retrieve-region";
- private static final Logger logger = LogService.getLogger();
- private static final RetrieveRegionFunction instance = new RetrieveRegionFunction();
-
- public RetrieveRegionFunction() {
- }
-
- /** ------------------------------------------ */
- /** interface Function implementation */
- /** ------------------------------------------ */
-
- public static RetrieveRegionFunction getInstance() {
- return instance;
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public boolean hasResult() {
- return true;
- }
-
- @Override
- public boolean optimizeForWrite() {
- return true;
- }
-
- @Override
- public boolean isHA() {
- return true;
- }
-
- @Override
- public void execute(FunctionContext context) {
- String[] args = (String[]) context.getArguments();
- String where = args[0];
- String taskDesc = args[1];
- InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
- LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
- boolean partitioned = localRegion.getDataPolicy().withPartitioning();
- if (where.trim().isEmpty())
- retrieveFullRegion(irfc, partitioned, taskDesc);
- else
- retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc);
- }
-
- /** ------------------------------------------ */
- /** Retrieve region data with where clause */
- /** ------------------------------------------ */
-
- private void retrieveRegionWithWhereClause(
- InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) {
- String regionPath = localRegion.getFullPath();
- String qstr = "select key, value from " + regionPath + ".entries where " + where;
- logger.info(desc + ": " + qstr);
-
- try {
- Cache cache = CacheFactory.getAnyInstance();
- QueryService queryService = cache.getQueryService();
- Query query = queryService.newQuery(qstr);
- SelectResults<Struct> results =
- (SelectResults<Struct>) (partitioned ? query.execute(context) : query.execute());
-
- Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator());
- InternalResultSender irs = (InternalResultSender) context.getResultSender();
- StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
- sender.send();
- } catch (Exception e) {
- throw new FunctionException(e);
- }
- }
-
- private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) {
- return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
- @Override public Object[] next() {
- return delegate.next().getFieldValues();
- }
- };
- }
-
- /** ------------------------------------------ */
- /** Retrieve full region data */
- /** ------------------------------------------ */
-
- private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) {
- Iterator<Object[]> entries;
- if (partitioned) {
- PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
- ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
- // entries = getPREntryIterator(iter);
- entries = getSimpleEntryIterator(iter);
- } else {
- LocalRegion owner = (LocalRegion) context.getDataSet();
- Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator();
- // entries = getRREntryIterator(iter, owner);
- entries = getSimpleEntryIterator(iter);
- }
- InternalResultSender irs = (InternalResultSender) context.getResultSender();
- StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
- sender.send();
- }
-
-// /** An iterator for partitioned region that uses internal API to get serialized value */
-// private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
-// return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) {
-// @Override public Object[] next() {
-// Region.Entry entry = delegate.next();
-// int bucketId = delegate.getBucketId();
-// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
-// // owner needs to be the bucket region not the enclosing partition region
-// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
-// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
-// return new Object[] {keyInfo.getKey(), value};
-// }
-// };
-// }
-//
-// /** An iterator for replicated region that uses internal API to get serialized value */
-// private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) {
-// final LocalRegion owner = region;
-// return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
-// @Override public Object[] next() {
-// Region.Entry entry = delegate.next();
-// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
-// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
-// return new Object[] {keyInfo.getKey(), value};
-// }
-// };
-// }
-
- // todo. compare performance of regular and simple iterator
- /** An general iterator for both partitioned and replicated region that returns un-serialized value */
- private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) {
- return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
- @Override public Object[] next() {
- Region.Entry entry = delegate.next();
- return new Object[] {entry.getKey(), entry.getValue()};
- }
- };
- }
-
- /** ------------------------------------------ */
- /** abstract wrapper iterator */
- /** ------------------------------------------ */
-
- /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */
- abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> {
-
- final S delegate;
-
- protected WrapperIterator(S delegate) {
- this.delegate = delegate;
- }
-
- @Override public boolean hasNext() {
- return delegate.hasNext();
- }
-
- @Override public void remove() { }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
deleted file mode 100644
index 6041b70..0000000
--- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.execute.ResultSender;
-import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.BucketServerLocation66;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
-import io.pivotal.geode.spark.connector.internal.RegionMetadata;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This GemFire function retrieve region metadata
- */
-public class RetrieveRegionMetadataFunction implements Function {
-
- public final static String ID = "geode-spark-retrieve-region-metadata";
-
- private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction();
-
- public RetrieveRegionMetadataFunction() {
- }
-
- public static RetrieveRegionMetadataFunction getInstance() {
- return instance;
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public boolean optimizeForWrite() {
- return false;
- }
-
- @Override
- public boolean isHA() {
- return true;
- }
-
- @Override
- public boolean hasResult() {
- return true;
- }
-
- @Override
- public void execute(FunctionContext context) {
- LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet();
- String regionPath = region.getFullPath();
- boolean isPartitioned = region.getDataPolicy().withPartitioning();
- String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint());
- String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint());
-
- RegionMetadata metadata;
- if (! isPartitioned) {
- metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName);
- } else {
- PartitionedRegion pregion = (PartitionedRegion) region;
- int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
- Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles();
- HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap);
- metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName);
- }
-
- ResultSender<RegionMetadata> sender = context.getResultSender();
- sender.lastResult(metadata);
- }
-
- private String getTypeClassName(Class clazz) {
- return clazz == null ? null : clazz.getCanonicalName();
- }
-
- /** convert bucket to server map to server to bucket set map */
- private HashMap<ServerLocation, HashSet<Integer>>
- bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) {
- HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>();
- for (Integer id : map.keySet()) {
- List<BucketServerLocation66> locations = map.get(id);
- for (BucketServerLocation66 location : locations) {
- ServerLocation server = new ServerLocation(location.getHostName(), location.getPort());
- if (location.isPrimary()) {
- HashSet<Integer> set = serverBucketMap.get(server);
- if (set == null) {
- set = new HashSet<>();
- serverBucketMap.put(server, set);
- }
- set.add(id);
- break;
- }
- }
- }
- return serverBucketMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
deleted file mode 100644
index 9a7dc9d..0000000
--- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.execute.ResultSender;
-import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
-import org.apache.geode.cache.query.internal.types.StructTypeImpl;
-import org.apache.geode.cache.query.types.ObjectType;
-import org.apache.geode.cache.query.types.StructType;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.cache.CachedDeserializable;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * StructStreamingResultSender and StructStreamingResultCollector are paired
- * to transfer result of list of `org.apache.geode.cache.query.Struct`
- * from GemFire server to Spark Connector (the client of GemFire server)
- * in streaming, i.e., while sender sending the result, the collector can
- * start processing the arrived result without waiting for full result to
- * become available.
- */
-public class StructStreamingResultSender {
-
- public static final byte TYPE_CHUNK = 0x30;
- public static final byte DATA_CHUNK = 0x31;
- public static final byte ERROR_CHUNK = 0x32;
- public static final byte SER_DATA = 0x41;
- public static final byte UNSER_DATA = 0x42;
- public static final byte BYTEARR_DATA = 0x43;
-
- private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class);
- public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
-
- private static final Logger logger = LogService.getLogger();
- private static final int CHUNK_SIZE = 4096;
-
- // Note: The type of ResultSender returned from GemFire FunctionContext is
- // always ResultSender<Object>, so can't use ResultSender<byte[]> here
- private final ResultSender<Object> sender;
- private final StructType structType;
- private final Iterator<Object[]> rows;
- private String desc;
- private boolean closed = false;
-
- /**
- * the Constructor
- * @param sender the base ResultSender that send data in byte array
- * @param type the StructType of result record
- * @param rows the iterator of the collection of results
- * @param desc description of this result (used for logging)
- */
- public StructStreamingResultSender(
- ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) {
- if (sender == null || rows == null)
- throw new NullPointerException("sender=" + sender + ", rows=" + rows);
- this.sender = sender;
- this.structType = type;
- this.rows = rows;
- this.desc = desc;
- }
-
- /** the Constructor with default `desc` */
- public StructStreamingResultSender(
- ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) {
- this(sender, type, rows, "StructStreamingResultSender");
- }
-
- /**
- * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR.
- * TYPE chunk for sending struct type info, DATA chunk for sending data, and
- * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
- * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
- * there are multiple DATA chunks. Each DATA chunk contains multiple rows
- * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
- * exception is thrown, it is serialized and sent as the last chunk of the
- * result (in the form of ERROR chunk).
- */
- public void send() {
- if (closed) throw new RuntimeException("sender is closed.");
-
- HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null);
- String dataType = null;
- int typeSize = 0;
- int rowCount = 0;
- int dataSize = 0;
- try {
- if (rows.hasNext()) {
- // Note: only send type info if there's data with it
- typeSize = sendType(buf);
- buf.writeByte(DATA_CHUNK);
- int rowSize = structType == null ? 2 : structType.getFieldNames().length;
- while (rows.hasNext()) {
- rowCount ++;
- Object[] row = rows.next();
- if (rowCount < 2) dataType = entryDataType(row);
- if (rowSize != row.length)
- throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row));
- serializeRowToBuffer(row, buf);
- if (buf.size() > CHUNK_SIZE) {
- dataSize += sendBufferredData(buf, false);
- buf.writeByte(DATA_CHUNK);
- }
- }
- }
- // send last piece of data or empty byte array
- dataSize += sendBufferredData(buf, true);
- logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" +
- typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
- (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount)));
- } catch (IOException | RuntimeException e) {
- sendException(buf, e);
- } finally {
- closed = true;
- }
- }
-
- private String rowToString(String rowDesc, Object[] row) {
- StringBuilder buf = new StringBuilder();
- buf.append(rowDesc).append("(");
- for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]);
- return buf.append(")") .toString();
- }
-
- private String entryDataType(Object[] row) {
- StringBuilder buf = new StringBuilder();
- buf.append("(");
- for (int i = 0; i < row.length; i++) {
- if (i != 0) buf.append(", ");
- buf.append(row[i].getClass().getCanonicalName());
- }
- return buf.append(")").toString();
- }
-
- private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException {
- for (Object data : row) {
- if (data instanceof CachedDeserializable) {
- buf.writeByte(SER_DATA);
- DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf);
- } else if (data instanceof byte[]) {
- buf.writeByte(BYTEARR_DATA);
- DataSerializer.writeByteArray((byte[]) data, buf);
- } else {
- buf.writeByte(UNSER_DATA);
- DataSerializer.writeObject(data, buf);
- }
- }
- }
-
- /** return the size of type data */
- private int sendType(HeapDataOutputStream buf) throws IOException {
- // logger.info(desc + " struct type: " + structType);
- if (structType != null) {
- buf.writeByte(TYPE_CHUNK);
- DataSerializer.writeObject(structType, buf);
- return sendBufferredData(buf, false);
- } else {
- return 0; // default KeyValue type, no type info send
- }
- }
-
- private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException {
- if (isLast) sender.lastResult(buf.toByteArray());
- else sender.sendResult(buf.toByteArray());
- // logData(buf.toByteArray(), desc);
- int s = buf.size();
- buf.reset();
- return s;
- }
-
- /** Send the exception as the last chunk of the result. */
- private void sendException(HeapDataOutputStream buf, Exception e) {
- // Note: if exception happens during the serialization, the `buf` may contain
- // partial serialized data, which may cause de-serialization hang or error.
- // Therefore, always empty the buffer before sending the exception
- if (buf.size() > 0) buf.reset();
-
- try {
- buf.writeByte(ERROR_CHUNK);
- DataSerializer.writeObject(e, buf);
- } catch (IOException ioe) {
- logger.error("StructStreamingResultSender failed to send the result:", e);
- logger.error("StructStreamingResultSender failed to serialize the exception:", ioe);
- buf.reset();
- }
- // Note: send empty chunk as the last result if serialization of exception
- // failed, and the error is logged on the GemFire server side.
- sender.lastResult(buf.toByteArray());
- // logData(buf.toByteArray(), desc);
- }
-
-// private void logData(byte[] data, String desc) {
-// StringBuilder buf = new StringBuilder();
-// buf.append(desc);
-// for (byte b : data) {
-// buf.append(" ").append(b);
-// }
-// logger.info(buf.toString());
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
new file mode 100644
index 0000000..4fee0e0
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal;
+
+import org.apache.geode.distributed.internal.ServerLocation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.io.Serializable;
+
+/**
+ * This class contains all info required by GemFire RDD partitioner to create partitions.
+ */
+public class RegionMetadata implements Serializable {
+
+ private String regionPath;
+ private boolean isPartitioned;
+ private int totalBuckets;
+ private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
+ private String keyTypeName;
+ private String valueTypeName;
+
+ /**
+ * Default constructor.
+ * @param regionPath the full path of the given region
+ * @param isPartitioned true for partitioned region, false otherwise
+ * @param totalBuckets number of total buckets for partitioned region, ignored otherwise
+ * @param serverBucketMap geode server (host:port pair) to bucket set map
+ * @param keyTypeName region key class name
+ * @param valueTypeName region value class name
+ */
+ public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
+ String keyTypeName, String valueTypeName) {
+ this.regionPath = regionPath;
+ this.isPartitioned = isPartitioned;
+ this.totalBuckets = totalBuckets;
+ this.serverBucketMap = serverBucketMap;
+ this.keyTypeName = keyTypeName;
+ this.valueTypeName = valueTypeName;
+ }
+
+ public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
+ this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
+ }
+
+ public String getRegionPath() {
+ return regionPath;
+ }
+
+ public boolean isPartitioned() {
+ return isPartitioned;
+ }
+
+ public int getTotalBuckets() {
+ return totalBuckets;
+ }
+
+ public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
+ return serverBucketMap;
+ }
+
+ public String getKeyTypeName() {
+ return keyTypeName;
+ }
+
+ public String getValueTypeName() {
+ return valueTypeName;
+ }
+
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("RegionMetadata(region=").append(regionPath)
+ .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")")
+ .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets)
+ .append(", map=").append(serverBucketMap).append(")");
+ return buf.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
new file mode 100644
index 0000000..6e6e295
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.execute.*;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+import java.util.Iterator;
+
+public class QueryFunction implements Function {
+
+ private static final long serialVersionUID = 4866641340803692882L;
+
+ public final static String ID = "geode-spark-query-function";
+
+ private final static QueryFunction instance = new QueryFunction();
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final int CHUNK_SIZE = 1024;
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ public static QueryFunction getInstance() {
+ return instance;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return true;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public void execute(FunctionContext context) {
+ try {
+ String[] args = (String[]) context.getArguments();
+ String queryString = args[0];
+ String bucketSet = args[1];
+ InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+ LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+ boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+ Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
+ Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute();
+ ResultSender<Object> sender = context.getResultSender();
+ HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
+ Iterator<Object> iter = ((SelectResults) result).asList().iterator();
+ while (iter.hasNext()) {
+ Object row = iter.next();
+ DataSerializer.writeObject(row, buf);
+ if (buf.size() > CHUNK_SIZE) {
+ sender.sendResult(buf.toByteArray());
+ logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size());
+ buf.reset();
+ }
+ }
+ sender.lastResult(buf.toByteArray());
+ logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size());
+ buf.reset();
+ }
+ catch(Exception e) {
+ throw new FunctionException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
new file mode 100644
index 0000000..d3a2572
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import java.util.Iterator;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.internal.cache.*;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.cache.execute.InternalResultSender;
+import org.apache.geode.internal.cache.partitioned.PREntriesIterator;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * GemFire function that is used by `SparkContext.geodeRegion(regionPath, whereClause)`
+ * to retrieve region data set for the given bucket set as a RDD partition
+ **/
+public class RetrieveRegionFunction implements Function {
+
+ public final static String ID = "spark-geode-retrieve-region";
+ private static final Logger logger = LogService.getLogger();
+ private static final RetrieveRegionFunction instance = new RetrieveRegionFunction();
+
+ public RetrieveRegionFunction() {
+ }
+
+ /** ------------------------------------------ */
+ /** interface Function implementation */
+ /** ------------------------------------------ */
+
+ public static RetrieveRegionFunction getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return true;
+ }
+
+ @Override
+ public void execute(FunctionContext context) {
+ String[] args = (String[]) context.getArguments();
+ String where = args[0];
+ String taskDesc = args[1];
+ InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+ LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+ boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+ if (where.trim().isEmpty())
+ retrieveFullRegion(irfc, partitioned, taskDesc);
+ else
+ retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc);
+ }
+
+ /** ------------------------------------------ */
+ /** Retrieve region data with where clause */
+ /** ------------------------------------------ */
+
+ private void retrieveRegionWithWhereClause(
+ InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) {
+ String regionPath = localRegion.getFullPath();
+ String qstr = "select key, value from " + regionPath + ".entries where " + where;
+ logger.info(desc + ": " + qstr);
+
+ try {
+ Cache cache = CacheFactory.getAnyInstance();
+ QueryService queryService = cache.getQueryService();
+ Query query = queryService.newQuery(qstr);
+ SelectResults<Struct> results =
+ (SelectResults<Struct>) (partitioned ? query.execute(context) : query.execute());
+
+ Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator());
+ InternalResultSender irs = (InternalResultSender) context.getResultSender();
+ StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+ sender.send();
+ } catch (Exception e) {
+ throw new FunctionException(e);
+ }
+ }
+
+ private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) {
+ return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
+ @Override public Object[] next() {
+ return delegate.next().getFieldValues();
+ }
+ };
+ }
+
+ /** ------------------------------------------ */
+ /** Retrieve full region data */
+ /** ------------------------------------------ */
+
+ private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) {
+ Iterator<Object[]> entries;
+ if (partitioned) {
+ PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
+ ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
+ // entries = getPREntryIterator(iter);
+ entries = getSimpleEntryIterator(iter);
+ } else {
+ LocalRegion owner = (LocalRegion) context.getDataSet();
+ Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator();
+ // entries = getRREntryIterator(iter, owner);
+ entries = getSimpleEntryIterator(iter);
+ }
+ InternalResultSender irs = (InternalResultSender) context.getResultSender();
+ StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+ sender.send();
+ }
+
+// /** An iterator for partitioned region that uses internal API to get serialized value */
+// private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
+// return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) {
+// @Override public Object[] next() {
+// Region.Entry entry = delegate.next();
+// int bucketId = delegate.getBucketId();
+// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
+// // owner needs to be the bucket region not the enclosing partition region
+// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
+// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+// return new Object[] {keyInfo.getKey(), value};
+// }
+// };
+// }
+//
+// /** An iterator for replicated region that uses internal API to get serialized value */
+// private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) {
+// final LocalRegion owner = region;
+// return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+// @Override public Object[] next() {
+// Region.Entry entry = delegate.next();
+// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
+// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+// return new Object[] {keyInfo.getKey(), value};
+// }
+// };
+// }
+
+ // todo. compare performance of regular and simple iterator
+ /** An general iterator for both partitioned and replicated region that returns un-serialized value */
+ private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) {
+ return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+ @Override public Object[] next() {
+ Region.Entry entry = delegate.next();
+ return new Object[] {entry.getKey(), entry.getValue()};
+ }
+ };
+ }
+
+ /** ------------------------------------------ */
+ /** abstract wrapper iterator */
+ /** ------------------------------------------ */
+
+ /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */
+ abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> {
+
+ final S delegate;
+
+ protected WrapperIterator(S delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override public void remove() { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
new file mode 100644
index 0000000..6041b70
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.BucketServerLocation66;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import io.pivotal.geode.spark.connector.internal.RegionMetadata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This GemFire function retrieve region metadata
+ */
+public class RetrieveRegionMetadataFunction implements Function {
+
+ public final static String ID = "geode-spark-retrieve-region-metadata";
+
+ private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction();
+
+ public RetrieveRegionMetadataFunction() {
+ }
+
+ public static RetrieveRegionMetadataFunction getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean isHA() {
+ return true;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public void execute(FunctionContext context) {
+ LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet();
+ String regionPath = region.getFullPath();
+ boolean isPartitioned = region.getDataPolicy().withPartitioning();
+ String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint());
+ String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint());
+
+ RegionMetadata metadata;
+ if (! isPartitioned) {
+ metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName);
+ } else {
+ PartitionedRegion pregion = (PartitionedRegion) region;
+ int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
+ Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles();
+ HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap);
+ metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName);
+ }
+
+ ResultSender<RegionMetadata> sender = context.getResultSender();
+ sender.lastResult(metadata);
+ }
+
+ private String getTypeClassName(Class clazz) {
+ return clazz == null ? null : clazz.getCanonicalName();
+ }
+
+ /** convert bucket to server map to server to bucket set map */
+ private HashMap<ServerLocation, HashSet<Integer>>
+ bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) {
+ HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>();
+ for (Integer id : map.keySet()) {
+ List<BucketServerLocation66> locations = map.get(id);
+ for (BucketServerLocation66 location : locations) {
+ ServerLocation server = new ServerLocation(location.getHostName(), location.getPort());
+ if (location.isPrimary()) {
+ HashSet<Integer> set = serverBucketMap.get(server);
+ if (set == null) {
+ set = new HashSet<>();
+ serverBucketMap.put(server, set);
+ }
+ set.add(id);
+ break;
+ }
+ }
+ }
+ return serverBucketMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
new file mode 100644
index 0000000..9a7dc9d
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
+import org.apache.geode.cache.query.types.ObjectType;
+import org.apache.geode.cache.query.types.StructType;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * StructStreamingResultSender and StructStreamingResultCollector are paired
+ * to transfer result of list of `org.apache.geode.cache.query.Struct`
+ * from GemFire server to Spark Connector (the client of GemFire server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+public class StructStreamingResultSender {
+
+ public static final byte TYPE_CHUNK = 0x30;
+ public static final byte DATA_CHUNK = 0x31;
+ public static final byte ERROR_CHUNK = 0x32;
+ public static final byte SER_DATA = 0x41;
+ public static final byte UNSER_DATA = 0x42;
+ public static final byte BYTEARR_DATA = 0x43;
+
+ private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class);
+ public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
+
+ private static final Logger logger = LogService.getLogger();
+ private static final int CHUNK_SIZE = 4096;
+
+ // Note: The type of ResultSender returned from GemFire FunctionContext is
+ // always ResultSender<Object>, so can't use ResultSender<byte[]> here
+ private final ResultSender<Object> sender;
+ private final StructType structType;
+ private final Iterator<Object[]> rows;
+ private String desc;
+ private boolean closed = false;
+
+ /**
+ * the Constructor
+ * @param sender the base ResultSender that send data in byte array
+ * @param type the StructType of result record
+ * @param rows the iterator of the collection of results
+ * @param desc description of this result (used for logging)
+ */
+ public StructStreamingResultSender(
+ ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) {
+ if (sender == null || rows == null)
+ throw new NullPointerException("sender=" + sender + ", rows=" + rows);
+ this.sender = sender;
+ this.structType = type;
+ this.rows = rows;
+ this.desc = desc;
+ }
+
+ /** the Constructor with default `desc` */
+ public StructStreamingResultSender(
+ ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) {
+ this(sender, type, rows, "StructStreamingResultSender");
+ }
+
+ /**
+ * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR.
+ * TYPE chunk for sending struct type info, DATA chunk for sending data, and
+ * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
+ * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
+ * there are multiple DATA chunks. Each DATA chunk contains multiple rows
+ * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
+ * exception is thrown, it is serialized and sent as the last chunk of the
+ * result (in the form of ERROR chunk).
+ */
+ public void send() {
+ if (closed) throw new RuntimeException("sender is closed.");
+
+ HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null);
+ String dataType = null;
+ int typeSize = 0;
+ int rowCount = 0;
+ int dataSize = 0;
+ try {
+ if (rows.hasNext()) {
+ // Note: only send type info if there's data with it
+ typeSize = sendType(buf);
+ buf.writeByte(DATA_CHUNK);
+ int rowSize = structType == null ? 2 : structType.getFieldNames().length;
+ while (rows.hasNext()) {
+ rowCount ++;
+ Object[] row = rows.next();
+ if (rowCount < 2) dataType = entryDataType(row);
+ if (rowSize != row.length)
+ throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row));
+ serializeRowToBuffer(row, buf);
+ if (buf.size() > CHUNK_SIZE) {
+ dataSize += sendBufferredData(buf, false);
+ buf.writeByte(DATA_CHUNK);
+ }
+ }
+ }
+ // send last piece of data or empty byte array
+ dataSize += sendBufferredData(buf, true);
+ logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" +
+ typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
+ (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount)));
+ } catch (IOException | RuntimeException e) {
+ sendException(buf, e);
+ } finally {
+ closed = true;
+ }
+ }
+
+ private String rowToString(String rowDesc, Object[] row) {
+ StringBuilder buf = new StringBuilder();
+ buf.append(rowDesc).append("(");
+ for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]);
+ return buf.append(")") .toString();
+ }
+
+ private String entryDataType(Object[] row) {
+ StringBuilder buf = new StringBuilder();
+ buf.append("(");
+ for (int i = 0; i < row.length; i++) {
+ if (i != 0) buf.append(", ");
+ buf.append(row[i].getClass().getCanonicalName());
+ }
+ return buf.append(")").toString();
+ }
+
+ private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException {
+ for (Object data : row) {
+ if (data instanceof CachedDeserializable) {
+ buf.writeByte(SER_DATA);
+ DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf);
+ } else if (data instanceof byte[]) {
+ buf.writeByte(BYTEARR_DATA);
+ DataSerializer.writeByteArray((byte[]) data, buf);
+ } else {
+ buf.writeByte(UNSER_DATA);
+ DataSerializer.writeObject(data, buf);
+ }
+ }
+ }
+
+ /** return the size of type data */
+ private int sendType(HeapDataOutputStream buf) throws IOException {
+ // logger.info(desc + " struct type: " + structType);
+ if (structType != null) {
+ buf.writeByte(TYPE_CHUNK);
+ DataSerializer.writeObject(structType, buf);
+ return sendBufferredData(buf, false);
+ } else {
+ return 0; // default KeyValue type, no type info send
+ }
+ }
+
+ private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException {
+ if (isLast) sender.lastResult(buf.toByteArray());
+ else sender.sendResult(buf.toByteArray());
+ // logData(buf.toByteArray(), desc);
+ int s = buf.size();
+ buf.reset();
+ return s;
+ }
+
+ /** Send the exception as the last chunk of the result. */
+ private void sendException(HeapDataOutputStream buf, Exception e) {
+ // Note: if exception happens during the serialization, the `buf` may contain
+ // partial serialized data, which may cause de-serialization hang or error.
+ // Therefore, always empty the buffer before sending the exception
+ if (buf.size() > 0) buf.reset();
+
+ try {
+ buf.writeByte(ERROR_CHUNK);
+ DataSerializer.writeObject(e, buf);
+ } catch (IOException ioe) {
+ logger.error("StructStreamingResultSender failed to send the result:", e);
+ logger.error("StructStreamingResultSender failed to serialize the exception:", ioe);
+ buf.reset();
+ }
+ // Note: send empty chunk as the last result if serialization of exception
+ // failed, and the error is logged on the GemFire server side.
+ sender.lastResult(buf.toByteArray());
+ // logData(buf.toByteArray(), desc);
+ }
+
+// private void logData(byte[] data, String desc) {
+// StringBuilder buf = new StringBuilder();
+// buf.append(desc);
+// for (byte b : data) {
+// buf.append(" ").append(b);
+// }
+// logger.info(buf.toString());
+// }
+
+}