You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sdap.apache.org by GitBox <gi...@apache.org> on 2018/03/12 18:22:54 UTC

[GitHub] fgreg closed pull request #6: SDAP-32 CassandraProxy time series tiles to support time arrays

fgreg closed pull request #6: SDAP-32 CassandraProxy time series tiles to support time arrays
URL: https://github.com/apache/incubator-sdap-nexus/pull/6
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/data-access/.gitignore b/data-access/.gitignore
index b806e30..3b3f92b 100644
--- a/data-access/.gitignore
+++ b/data-access/.gitignore
@@ -34,6 +34,7 @@ __pycache__/
 
 # C extensions
 *.so
+*.c
 
 # Distribution / packaging
 .Python
diff --git a/data-access/nexustiles/config/datastores.ini b/data-access/nexustiles/config/datastores.ini
index e578703..7c1c82e 100644
--- a/data-access/nexustiles/config/datastores.ini
+++ b/data-access/nexustiles/config/datastores.ini
@@ -17,4 +17,4 @@ host=localhost:8983
 core=nexustiles
 
 [datastore]
-store=s3
\ No newline at end of file
+store=cassandra
\ No newline at end of file
diff --git a/data-access/nexustiles/dao/CassandraProxy.pyx b/data-access/nexustiles/dao/CassandraProxy.pyx
index 8878dff..49f272d 100644
--- a/data-access/nexustiles/dao/CassandraProxy.pyx
+++ b/data-access/nexustiles/dao/CassandraProxy.pyx
@@ -17,7 +17,7 @@ import uuid
 from ConfigParser import NoOptionError
 from multiprocessing.synchronize import Lock
 
-import nexusproto.NexusContent_pb2 as nexusproto
+import nexusproto.DataTile_pb2 as nexusproto
 import numpy as np
 from cassandra.cqlengine import columns, connection, CQLEngineException
 from cassandra.cqlengine.models import Model
@@ -99,19 +99,24 @@ class NexusTileData(Model):
             time_series_tile = self._get_nexus_tile().time_series_tile
 
             time_series_tile_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.variable_data))
-            time_data = np.array([time_series_tile.time])
+            time_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.time)).reshape(-1)
             latitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.latitude))
             longitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.longitude))
 
-            tile_data = self._to_standard_index(time_series_tile_data,
-                                                (len(time_data), len(latitude_data), len(longitude_data)))
-
+            reshaped_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
+            idx = np.arange(len(latitude_data))
+            reshaped_array[:, idx, idx] = time_series_tile_data
+            tile_data = reshaped_array
             # Extract the meta data
             meta_data = {}
             for meta_data_obj in time_series_tile.meta_data:
                 name = meta_data_obj.name
                 meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
-                reshaped_meta_array = self._to_standard_index(meta_array, tile_data.shape)
+
+                reshaped_meta_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
+                idx = np.arange(len(latitude_data))
+                reshaped_meta_array[:, idx, idx] = meta_array
+
                 meta_data[name] = reshaped_meta_array
 
             return latitude_data, longitude_data, time_data, tile_data, meta_data
diff --git a/data-access/nexustiles/dao/DynamoProxy.pyx b/data-access/nexustiles/dao/DynamoProxy.pyx
index 80641db..b10cb12 100644
--- a/data-access/nexustiles/dao/DynamoProxy.pyx
+++ b/data-access/nexustiles/dao/DynamoProxy.pyx
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import uuid
-import nexusproto.NexusContent_pb2 as nexusproto
+import nexusproto.DataTile_pb2 as nexusproto
 from nexusproto.serialization import from_shaped_array
 import numpy as np
 import boto3
diff --git a/data-access/nexustiles/dao/S3Proxy.pyx b/data-access/nexustiles/dao/S3Proxy.pyx
index bdac8a9..29e5191 100644
--- a/data-access/nexustiles/dao/S3Proxy.pyx
+++ b/data-access/nexustiles/dao/S3Proxy.pyx
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import uuid
-import nexusproto.NexusContent_pb2 as nexusproto
-from nexusproto.serialization import from_shaped_array
-import numpy as np
+
 import boto3
+import nexusproto.DataTile_pb2 as nexusproto
+import numpy as np
+from nexusproto.serialization import from_shaped_array
+
 
 class NexusTileData(object):
     __nexus_tile = None
@@ -127,7 +129,6 @@ class S3Proxy(object):
         self.__nexus_tile = None
 
     def fetch_nexus_tiles(self, *tile_ids):
-
         tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
                     (isinstance(tile_id, str) or isinstance(tile_id, unicode))]
         res = []
@@ -137,4 +138,4 @@ class S3Proxy(object):
             nexus_tile = NexusTileData(data, str(tile_id))
             res.append(nexus_tile)
 
-        return res
\ No newline at end of file
+        return res
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 26bd7a2..2888b1d 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -21,10 +21,10 @@
 import numpy as np
 import numpy.ma as ma
 import pkg_resources
-from dao.CassandraProxy import CassandraProxy
-from dao.S3Proxy import S3Proxy
-from dao.DynamoProxy import DynamoProxy
-from dao.SolrProxy import SolrProxy
+import dao.CassandraProxy
+import dao.S3Proxy
+import dao.DynamoProxy
+import dao.SolrProxy
 from pytz import timezone
 from shapely.geometry import MultiPolygon, box
 
@@ -73,16 +73,16 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
         if not skipDatastore:
             datastore = self._config.get("datastore", "store")
             if datastore == "cassandra":
-                self._datastore = CassandraProxy(self._config)
+                self._datastore = dao.CassandraProxy.CassandraProxy(self._config)
             elif datastore == "s3":
-                self._datastore = S3Proxy(self._config)
+                self._datastore = dao.S3Proxy.S3Proxy(self._config)
             elif datastore == "dynamo":
-                self._datastore = DynamoProxy(self._config)
+                self._datastore = dao.DynamoProxy.DynamoProxy(self._config)
             else:
                 raise ValueError("Error reading datastore from config file")
 
         if not skipMetadatastore:
-            self._metadatastore = SolrProxy(self._config)
+            self._metadatastore = dao.SolrProxy.SolrProxy(self._config)
 
     def get_dataseries_list(self, simple=False):
         if simple:
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index 2bcec51..4975e61 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -14,7 +14,7 @@ futures==3.1.1
 ipython==5.3.0
 ipython-genutils==0.2.0
 jmespath==0.9.3
-nexusproto==0.4
+nexusproto==1.0.0-SNAPSHOT
 numpy==1.11.1
 pathlib2==2.2.1
 pexpect==4.2.1
diff --git a/data-access/setup.py b/data-access/setup.py
index 0375814..ee1201d 100644
--- a/data-access/setup.py
+++ b/data-access/setup.py
@@ -36,7 +36,7 @@
         'cassandra-driver==3.5.0',
         'solrpy==0.9.7',
         'requests',
-        'nexusproto==0.4',
+        'nexusproto==1.0.0-SNAPSHOT',
         'shapely'
     ],
 
diff --git a/data-access/tests/nexustiles_test.py b/data-access/tests/nexustiles_test.py
index 9318795..9b2901e 100644
--- a/data-access/tests/nexustiles_test.py
+++ b/data-access/tests/nexustiles_test.py
@@ -29,11 +29,14 @@ def setUp(self):
 keyspace=nexustiles
 local_datacenter=datacenter1
 protocol_version=3
-port=32769
+port=9042
 
 [solr]
-host=localhost:8986
-core=nexustiles""")
+host=localhost:8983
+core=nexustiles
+
+[datastore]
+store=cassandra""")
         cp = ConfigParser.RawConfigParser()
         cp.readfp(config)
 
@@ -68,6 +71,11 @@ def test_sorted_box(self):
         for tile in tiles:
             print tile.min_time
 
+    def test_time_series_tile(self):
+        tiles = self.tile_service.find_tile_by_id("055c0b51-d0fb-3f39-b48a-4f762bf0c994")
+        for tile in tiles:
+            print tile.get_summary()
+
 
 # from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
 # import numpy as np
diff --git a/tools/deletebyquery/deletebyquery.py b/tools/deletebyquery/deletebyquery.py
index 6732e1a..f703448 100644
--- a/tools/deletebyquery/deletebyquery.py
+++ b/tools/deletebyquery/deletebyquery.py
@@ -29,6 +29,7 @@
 
 solr_connection = None
 solr_collection = None
+SOLR_UNIQUE_KEY = None
 
 cassandra_cluster = None
 cassandra_session = None
@@ -45,6 +46,8 @@ def init(args):
     solr_connection = SolrConnection(args.solr)
     global solr_collection
     solr_collection = solr_connection[args.collection]
+    global SOLR_UNIQUE_KEY
+    SOLR_UNIQUE_KEY = args.solrIdField
 
     dc_policy = RoundRobinPolicy()
     token_policy = TokenAwarePolicy(dc_policy)
@@ -64,6 +67,7 @@ def delete_by_query(args):
     if args.query:
         se = SearchOptions()
         se.commonparams.q(args.query) \
+            .fl(SOLR_UNIQUE_KEY) \
             .fl('id')
 
         for fq in args.filterquery if args.filterquery is not None else []:
@@ -72,7 +76,8 @@ def delete_by_query(args):
         query = se
     elif args.jsonparams:
         se = SearchOptions(**json.loads(args.jsonparams))
-        se.commonparams.fl('id')
+        se.commonparams.fl(SOLR_UNIQUE_KEY) \
+            .fl('id')
         query = se
     else:
         raise RuntimeError("either query or jsonparams is required")
@@ -82,7 +87,7 @@ def delete_by_query(args):
         solr_docs = do_solr_query(query)
 
         if confirm_delete(len(solr_docs)):
-            deleted_ids = do_delete(solr_docs)
+            deleted_ids = do_delete(solr_docs, query)
             logging.info("Deleted tile IDs %s" % json.dumps([str(doc_id) for doc_id in deleted_ids], indent=2))
         else:
             logging.info("Exiting")
@@ -123,7 +128,7 @@ def check_query(query):
         return False
     else:
         se = SearchOptions()
-        se.commonparams.q('id:%s' % sample(solr_response.result.response.docs, 1)[0]['id'])
+        se.commonparams.q('%s:%s' % (SOLR_UNIQUE_KEY, sample(solr_response.result.response.docs, 1)[0][SOLR_UNIQUE_KEY]))
         logging.info(json.dumps(solr_collection.search(se).result.response.docs[0], indent=2))
         return check_query(query)
 
@@ -132,7 +137,7 @@ def do_solr_query(query):
     doc_ids = []
 
     next_cursor_mark = "*"
-    query.commonparams.sort('id asc')
+    query.commonparams.sort('%s asc' % SOLR_UNIQUE_KEY)
     while True:
         query.commonparams.remove_param('cursorMark')
         query.commonparams.add_params(cursorMark=next_cursor_mark)
@@ -154,11 +159,11 @@ def do_solr_query(query):
     return doc_ids
 
 
-def do_delete(doc_ids):
+def do_delete(doc_ids, query):
     logging.info("Executing Cassandra delete...")
     delete_from_cassandra(doc_ids)
     logging.info("Executing Solr delete...")
-    delete_from_solr(doc_ids)
+    delete_from_solr(query)
     return doc_ids
 
 
@@ -170,12 +175,11 @@ def delete_from_cassandra(doc_ids):
 
     for (success, result) in results:
         if not success:
-            logging.warn("Could not delete tile %s" % result)
+            logging.warning("Could not delete tile %s" % result)
 
 
-def delete_from_solr(doc_ids):
-    for doc_id in doc_ids:
-        solr_collection.delete({'q': "id:%s" % doc_id}, commit=False)
+def delete_from_solr(query):
+    solr_collection.delete(query, commit=False)
     solr_collection.commit()
 
 
@@ -193,6 +197,12 @@ def parse_args():
                         required=True,
                         metavar='nexustiles')
 
+    parser.add_argument('--solrIdField',
+                        help='The name of the unique ID field for this collection.',
+                        required=False,
+                        default='solr_id_s',
+                        metavar='solr_id_s')
+
     parser.add_argument('--cassandra',
                         help='The hostname(s) or IP(s) of the Cassandra server(s).',
                         required=True,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services