You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/03/24 21:45:39 UTC
[tika] branch main updated: TIKA-3707 -- add fetcher and emitter for Azure blob storage -- add pipes iterator and clean up some other stuff.
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 2945393 TIKA-3707 -- add fetcher and emitter for Azure blob storage -- add pipes iterator and clean up some other stuff.
2945393 is described below
commit 294539352773d69b675e9048523607ba55792a87
Author: tallison <ta...@apache.org>
AuthorDate: Thu Mar 24 17:45:23 2022 -0400
TIKA-3707 -- add fetcher and emitter for Azure blob storage -- add pipes iterator and clean up some other stuff.
---
tika-parent/pom.xml | 7 +
.../tika-emitters/tika-emitter-az-blob/pom.xml | 3 +-
.../tika-fetchers/tika-fetcher-az-blob/pom.xml | 3 +-
tika-pipes/tika-pipes-iterators/pom.xml | 1 +
.../pom.xml | 11 +-
.../pipesiterator/azblob/AZBlobPipesIterator.java | 155 +++++++++++++++++++++
.../azblob/TestAZBlobPipesIterator.java | 99 +++++++++++++
.../src/test/resources/log4j2.xml | 32 +++++
.../tika-pipes-iterator-gcs/pom.xml | 2 +-
.../src/test/resources/log4j.properties | 22 ---
.../src/test/resources/log4j2.xml | 32 +++++
11 files changed, 334 insertions(+), 33 deletions(-)
diff --git a/tika-parent/pom.xml b/tika-parent/pom.xml
index e9f7579..04148f1 100644
--- a/tika-parent/pom.xml
+++ b/tika-parent/pom.xml
@@ -398,6 +398,13 @@
<version>0.9.1.2</version>
</dependency>
<dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ <!-- 12.15.0 introduces reams of convergence errors with
+ all sorts of netty components -->
+ <version>12.14.4</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml b/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
index 57a83f7..1857fe6 100644
--- a/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
@@ -46,7 +46,6 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
- <version>12.14.4</version>
</dependency>
</dependencies>
@@ -58,7 +57,7 @@
<configuration>
<archive>
<manifestEntries>
- <Automatic-Module-Name>org.apache.tika.pipes.emitter.gcs</Automatic-Module-Name>
+ <Automatic-Module-Name>org.apache.tika.pipes.emitter.azblob</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
index 44eddcf..4fe59c8 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
@@ -38,7 +38,6 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
- <version>12.14.4</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
@@ -63,7 +62,7 @@
<configuration>
<archive>
<manifestEntries>
- <Automatic-Module-Name>org.apache.tika.pipes.fetcher.gcs</Automatic-Module-Name>
+ <Automatic-Module-Name>org.apache.tika.pipes.fetcher.azblob</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git a/tika-pipes/tika-pipes-iterators/pom.xml b/tika-pipes/tika-pipes-iterators/pom.xml
index 9f7b41e..de1ed74 100644
--- a/tika-pipes/tika-pipes-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -39,6 +39,7 @@
<module>tika-pipes-iterator-s3</module>
<module>tika-pipes-iterator-solr</module>
<module>tika-pipes-iterator-gcs</module>
+ <module>tika-pipes-iterator-az-blob</module>
</modules>
<scm>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/pom.xml
similarity index 93%
copy from tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
copy to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/pom.xml
index 50c8445..cc9fb00 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/pom.xml
@@ -26,9 +26,9 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-pipes-iterator-gcs</artifactId>
+ <artifactId>tika-pipes-iterator-az-blob</artifactId>
- <name>Apache Tika Pipes Iterator - Google Cloud Storage</name>
+ <name>Apache Tika Pipes Iterator - Azure Blob Storage</name>
<url>https://tika.apache.org/</url>
<dependencies>
@@ -39,9 +39,8 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.google.cloud</groupId>
- <artifactId>google-cloud-storage</artifactId>
- <version>${google.cloud.version}</version>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
</dependency>
</dependencies>
<build>
@@ -52,7 +51,7 @@
<configuration>
<archive>
<manifestEntries>
- <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.s3</Automatic-Module-Name>
+ <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.azblob</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
new file mode 100644
index 0000000..b207800
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.pipesiterator.azblob;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+import org.apache.tika.utils.StringUtils;
+
+public class AZBlobPipesIterator extends PipesIterator implements Initializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AZBlobPipesIterator.class);
+
+ private BlobServiceClient blobServiceClient;
+ private BlobContainerClient blobContainerClient;
+ private String prefix = "";
+ private String container = "";
+ private String sasToken;
+ private String endpoint;
+ private long timeoutMillis = 360000;
+
+ @Field
+ public void setSasToken(String sasToken) {
+ this.sasToken = sasToken;
+ }
+
+ @Field
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Field
+ public void setContainer(String container) {
+ this.container = container;
+ }
+
+ @Field
+ public void setPrefix(String prefix) {
+ //strip final "/" if it exists
+ if (prefix.endsWith("/")) {
+ this.prefix = prefix.substring(0, prefix.length() - 1);
+ } else {
+ this.prefix = prefix;
+ }
+ }
+
+ @Override
+ protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+ String fetcherName = getFetcherName();
+ String emitterName = getEmitterName();
+ long start = System.currentTimeMillis();
+ int count = 0;
+ HandlerConfig handlerConfig = getHandlerConfig();
+
+ PagedIterable<BlobItem> blobs = null;
+ if (StringUtils.isBlank(prefix)) {
+ ListBlobsOptions options = new ListBlobsOptions()
+ .setDetails(
+ new BlobListDetails()
+ .setRetrieveDeletedBlobs(false)
+ .setRetrieveMetadata(false)
+ .setRetrieveSnapshots(false));
+ blobs = blobContainerClient.listBlobs(options,
+ Duration.of(timeoutMillis, ChronoUnit.MILLIS));
+ } else {
+ ListBlobsOptions options = new ListBlobsOptions()
+ .setPrefix(prefix)
+ .setDetails(
+ new BlobListDetails()
+ .setRetrieveDeletedBlobs(false)
+ .setRetrieveMetadata(false)
+ .setRetrieveSnapshots(false));
+ blobs = blobContainerClient.listBlobs(options,
+ Duration.of(timeoutMillis, ChronoUnit.MILLIS));
+ }
+
+ for (BlobItem blob : blobs) {
+ //tried blob.isPrefix() and got NPE ... user error?
+ if (blob == null || blob.getProperties() == null ||
+ blob.getProperties().getContentLength() == 0) {
+ continue;
+ }
+ long elapsed = System.currentTimeMillis() - start;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("adding ({}) {} in {} ms", count, blob.getName(), elapsed);
+ }
+ //TODO -- extract metadata about content length etc from properties
+ tryToAdd(new FetchEmitTuple(blob.getName(), new FetchKey(fetcherName,
+ blob.getName()),
+ new EmitKey(emitterName, blob.getName()), new Metadata(), handlerConfig,
+ getOnParseException()));
+ count++;
+ }
+ long elapsed = System.currentTimeMillis() - start;
+ LOGGER.info("finished enqueuing {} files in {} ms", count, elapsed);
+ }
+
+ @Override
+ public void initialize(Map<String, Param> params) throws TikaConfigException {
+ //TODO -- allow authentication via other methods
+ blobServiceClient = new BlobServiceClientBuilder()
+ .endpoint(endpoint)
+ .sasToken(sasToken)
+ .buildClient();
+ blobContainerClient = blobServiceClient.getBlobContainerClient(container);
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
+ mustNotBeEmpty("sasToken", this.sasToken);
+ mustNotBeEmpty("endpoint", this.endpoint);
+ mustNotBeEmpty("container", this.container);
+ }
+}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/java/org/apache/tika/pipes/pipesiterator/azblob/TestAZBlobPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/java/org/apache/tika/pipes/pipesiterator/azblob/TestAZBlobPipesIterator.java
new file mode 100644
index 0000000..f090ac2
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/java/org/apache/tika/pipes/pipesiterator/azblob/TestAZBlobPipesIterator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.pipesiterator.azblob;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+
+@Disabled("turn into an actual unit test")
+public class TestAZBlobPipesIterator {
+
+ @Test
+ public void testSimple() throws Exception {
+ AZBlobPipesIterator it = new AZBlobPipesIterator();
+ it.setContainer("");
+ it.setEndpoint("");
+ it.setSasToken("");
+ it.initialize(Collections.EMPTY_MAP);
+ int numConsumers = 2;
+ ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
+
+ ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
+ ExecutorCompletionService c = new ExecutorCompletionService(es);
+ List<MockFetcher> fetchers = new ArrayList<>();
+ for (int i = 0; i < numConsumers; i++) {
+ MockFetcher fetcher = new MockFetcher(queue);
+ fetchers.add(fetcher);
+ c.submit(fetcher);
+ }
+ for (FetchEmitTuple t : it) {
+ queue.offer(t);
+ }
+ for (int i = 0; i < numConsumers; i++) {
+ queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
+ }
+ int finished = 0;
+ int completed = 0;
+ try {
+ while (finished < numConsumers) {
+ Future<Integer> f = c.take();
+ completed += f.get();
+ finished++;
+ }
+ } finally {
+ es.shutdownNow();
+ }
+ assertEquals(1, completed);
+
+ }
+
+ private static class MockFetcher implements Callable<Integer> {
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
+ private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ while (true) {
+ FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
+ return pairs.size();
+ }
+ pairs.add(t);
+ }
+ }
+ }
+}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/resources/log4j2.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..c88e66e
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_ERR">
+ <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
index 50c8445..551a053 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
@@ -52,7 +52,7 @@
<configuration>
<archive>
<manifestEntries>
- <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.s3</Automatic-Module-Name>
+ <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.gcs</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
deleted file mode 100644
index 2b2da1a..0000000
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#info,debug, error,fatal ...
-log4j.rootLogger=info,stderr
-#console
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j2.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..c88e66e
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_ERR">
+ <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file