You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2021/07/12 17:21:31 UTC

[incubator-sdap-ingester] branch support_solr_bitnami created (now 4ae356b)

This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a change to branch support_solr_bitnami
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git.


      at 4ae356b  correct bug on max_concurrency param in docker

This branch includes the following new commits:

     new 4ae356b  correct bug on max_concurrency param in docker

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[incubator-sdap-ingester] 01/01: correct bug on max_concurrency param in docker

Posted by tl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch support_solr_bitnami
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 4ae356b2b83d15733465dbb34bd8fb33dd16d264
Author: thomas loubrieu <th...@jpl.nasa.gov>
AuthorDate: Mon Jul 12 10:20:57 2021 -0700

    correct bug on max_concurrency param in docker
---
 .../services/CollectionWatcher.py                  |  3 ++
 granule_ingester/docker/Dockerfile                 |  4 ++-
 granule_ingester/docker/entrypoint.sh              |  5 ++-
 granule_ingester/granule_ingester/README.md        |  8 ++++-
 .../granule_ingester/pipeline/Pipeline.py          |  2 ++
 .../granule_ingester/writers/CassandraStore.py     |  7 ++--
 .../granule_ingester/writers/SolrStore.py          | 37 +++++++++++++++++++---
 7 files changed, 57 insertions(+), 9 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index b713f2d..e0cbe56 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -123,10 +123,13 @@ class CollectionWatcher:
 
     def _get_files_at_path(self, path: str) -> List[str]:
         if os.path.isfile(path):
+            logger.info("process collections path as file")
             return [path]
         elif os.path.isdir(path):
+            logger.info("process collection path as directory")
             return [f for f in glob(path + '/**', recursive=True) if os.path.isfile(f)]
         else:
+            logger.info("process collection path as file path regex")
             return [f for f in glob(path, recursive=True) if os.path.isfile(f)]
 
     async def _reload_and_reschedule(self):
diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile
index 1e7aedd..6f9d525 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -22,4 +22,6 @@ RUN pip install boto3==1.16.10
 
 RUN apk del .build-deps
 
-ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
+USER 1001
+
+ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh
index 662bd3d..9a5a046 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -1,5 +1,7 @@
 #!/bin/sh
 
+[[ ! -z "$MAX_THREADS" ]] && export MAX_THREADS_INT=`echo $MAX_THREADS | sed -e 's/^"//' -e 's/"$//'`
+
 python /sdap/granule_ingester/main.py \
   $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
   $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \
@@ -11,4 +13,5 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \
   $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \
   $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) \
-  $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
+  $([[ ! -z "$MAX_THREADS_INT" ]] && echo --max-threads=$MAX_THREADS_INT) \
+  --verbose
diff --git a/granule_ingester/granule_ingester/README.md b/granule_ingester/granule_ingester/README.md
index 881461a..aace983 100644
--- a/granule_ingester/granule_ingester/README.md
+++ b/granule_ingester/granule_ingester/README.md
@@ -8,4 +8,10 @@ The custom code file would be copied into the ingestion pods via the helm chart
 Example: `KelvinToCelsiusProcessor`
 This processor checks the units of the saved variable.  If it is some form of Kelvin, it automatically converts all of the temperature measurements to Celsius by subtracting 273.15 from each data point.  The transformed data then replaces the default (untransformed) values and the processor returns the modified tile.
 
-#### TODO Add configuration option for unusual representations of temperature units.
\ No newline at end of file
+#### TODO Add configuration option for unusual representations of temperature units.
+
+
+## Building the Docker image
+From `incubator-sdap-ingester`, run:
+
+    $ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 86bf9c8..59f02a0 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -64,6 +64,7 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
+    logger.debug("start to process tile in worker")
     try:
         input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
         processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile)
@@ -82,6 +83,7 @@ def _recurse(processor_list: List[TileProcessor],
              input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
     if len(processor_list) == 0:
         return input_tile
+    logger.debug("start processor %s", processor_list[0].__class__.__name__)
     output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
     return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None
 
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py
index cb5232b..514d12f 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -64,7 +64,8 @@ class CassandraStore(DataStore):
 
         cluster = Cluster(contact_points=self._contact_points,
                           port=self._port,
-                          # load_balancing_policy=
+                          #load_balancing_policy=DCAwareRoundRobinPolicy("dc1"),
+                          protocol_version=4,
                           reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
                           default_retry_policy=RetryPolicy(),
                           auth_provider=auth_provider)
@@ -84,9 +85,11 @@ class CassandraStore(DataStore):
             tile_id = uuid.UUID(tile.summary.tile_id)
             serialized_tile_data = TileData.SerializeToString(tile.tile)
             prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+            logger.debug("starting to updload tile %s data on cassandra", tile_id)
             await self._execute_query_async(self._session, prepared_query,
                                             [tile_id, bytearray(serialized_tile_data)])
-        except NoHostAvailable:
+        except Exception as e:
+            logger.error("exception while uploading tile data on cassandra %s", e)
             raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.")
 
     @staticmethod
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py
index b753404..42ca066 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -50,13 +50,42 @@ class SolrStore(MetadataStore):
         self.log.setLevel(logging.DEBUG)
         self._solr = None
 
+    def _get_collections(self, zk, parent_nodes):
+        """
+            try to get list of collection from zookeper, on a list of candidate nodes,
+            return the first successful request result
+        """
+
+        try:
+            logger.info("getting solr configuration from zookeeper, node '%s'", parent_nodes[0])
+            return parent_nodes[0], zk.zk.get_children(parent_nodes[0])
+        except NoNodeError:
+            logger.info("solr configuration not found in node '%s'", parent_nodes[0])
+            if len(parent_nodes)>1:
+                return self._get_collections(zk, parent_nodes[1:])
+            else:
+                raise
+
+    def _set_solr_status(self, zk):
+        """ because of something not working right between zookeeper and solr
+            we need to  manually update the solr status on zookeeper
+            see https://github.com/django-haystack/pysolr/issues/189
+        """
+        collections = {}
+        parent_node, zk_collections = self._get_collections(zk,
+                                                            ['collections',
+                                                             'solr/collections']
+                                                            # with bitnami/solr 0.3.3 helm chart deployment
+                                                            )
+
+        for c in zk_collections:
+            collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
+        zk.collections = collections
+
     def _get_connection(self) -> pysolr.Solr:
         if self._zk_url:
             zk = pysolr.ZooKeeper(f"{self._zk_url}")
-            collections = {}
-            for c in zk.zk.get_children("collections"):
-                collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
-            zk.collections = collections
+            self._set_solr_status(zk)
             return pysolr.SolrCloud(zk, self._collection, always_commit=True)
         elif self._solr_url:
             return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)