You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/03/29 09:57:35 UTC

(tika) branch TIKA-4181-grpc updated: add bidi streaming

This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-4181-grpc by this push:
     new cd397efe9 add bidi streaming
cd397efe9 is described below

commit cd397efe9f617688054df5f63e8f68b164a7c321
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Fri Mar 29 04:57:29 2024 -0500

    add bidi streaming
---
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 120 +++++++++++++--------
 tika-pipes/tika-grpc/src/main/proto/tika.proto     |   2 +
 .../org/apache/tika/pipes/grpc/TikaClient.java     |  25 ++++-
 .../apache/tika/pipes/grpc/TikaGrpcServerTest.java | 104 ++++++++++++++++--
 tika-pipes/tika-grpc/src/test/resources/log4j2.xml |  32 ++++++
 5 files changed, 225 insertions(+), 58 deletions(-)

diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index f162e235b..c8e567460 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -36,6 +36,8 @@ import javax.xml.transform.stream.StreamResult;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
@@ -55,6 +57,7 @@ import org.apache.tika.UpdateFetcherReply;
 import org.apache.tika.UpdateFetcherRequest;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.Param;
+import org.apache.tika.config.TikaConfigSerializer;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
@@ -67,6 +70,7 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.config.AbstractConfig;
 
 class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
+    private static final Logger LOG = LoggerFactory.getLogger(TikaConfigSerializer.class);
     public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     /**
      * FetcherID is key, The pair is the Fetcher object and the Metadata
@@ -125,6 +129,74 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         }
     }
 
+    @Override
+    public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
+                                                 StreamObserver<FetchAndParseReply> responseObserver) {
+        fetchAndParseImpl(request, responseObserver);
+    }
+
+    @Override
+    public StreamObserver<FetchAndParseRequest> fetchAndParseBiDirectionalStreaming(
+            StreamObserver<FetchAndParseReply> responseObserver) {
+        return new StreamObserver<>() {
+            @Override
+            public void onNext(FetchAndParseRequest fetchAndParseRequest) {
+                fetchAndParseImpl(fetchAndParseRequest, responseObserver);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOG.error("Parse error occurred", throwable);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void fetchAndParse(FetchAndParseRequest request,
+                              StreamObserver<FetchAndParseReply> responseObserver) {
+        fetchAndParseImpl(request, responseObserver);
+        responseObserver.onCompleted();
+    }
+
+
+    private void fetchAndParseImpl(FetchAndParseRequest request,
+                                   StreamObserver<FetchAndParseReply> responseObserver) {
+        AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+        if (fetcher == null) {
+            throw new RuntimeException(
+                    "Could not find fetcher with name " + request.getFetcherName());
+        }
+        Metadata tikaMetadata = new Metadata();
+        for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
+            tikaMetadata.add(entry.getKey(), entry.getValue());
+        }
+        try {
+            PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
+                    new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(),
+                    FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+            for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
+                FetchAndParseReply.Builder fetchReplyBuilder =
+                        FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
+                for (String name : metadata.names()) {
+                    String value = metadata.get(name);
+                    if (value != null) {
+                        fetchReplyBuilder.putFields(name, value);
+                    }
+                }
+                responseObserver.onNext(fetchReplyBuilder.build());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     @SuppressWarnings("raw")
     @Override
     public void createFetcher(CreateFetcherRequest request,
@@ -163,17 +235,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             }
             fetchers.put(name, abstractFetcher);
             fetcherConfigs.put(name, configObject);
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        } catch (InstantiationException e) {
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
-        } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
-        } catch (TikaConfigException e) {
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
+                 InvocationTargetException | NoSuchMethodException | TikaConfigException e) {
             throw new RuntimeException(e);
         }
     }
@@ -186,41 +249,6 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         return tikaParamsMap;
     }
 
-    @Override
-    public void fetchAndParse(FetchAndParseRequest request,
-                              StreamObserver<FetchAndParseReply> responseObserver) {
-        AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
-        if (fetcher == null) {
-            throw new RuntimeException(
-                    "Could not find fetcher with name " + request.getFetcherName());
-        }
-        Metadata tikaMetadata = new Metadata();
-        for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
-            tikaMetadata.add(entry.getKey(), entry.getValue());
-        }
-        try {
-            PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
-                    new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(),
-                    FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
-            for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
-                FetchAndParseReply.Builder fetchReplyBuilder =
-                        FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
-                for (String name : metadata.names()) {
-                    String value = metadata.get(name);
-                    if (value != null) {
-                        fetchReplyBuilder.putFields(name, value);
-                    }
-                }
-                responseObserver.onNext(fetchReplyBuilder.build());
-            }
-            responseObserver.onCompleted();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
     @Override
     public void updateFetcher(UpdateFetcherRequest request,
                               StreamObserver<UpdateFetcherReply> responseObserver) {
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
index 3095a0779..3c25ec7dd 100644
--- a/tika-pipes/tika-grpc/src/main/proto/tika.proto
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -27,6 +27,8 @@ service Tika {
   rpc ListFetchers(ListFetchersRequest) returns (ListFetchersReply) {}
   rpc DeleteFetcher(DeleteFetcherRequest) returns (DeleteFetcherReply) {}
   rpc FetchAndParse(FetchAndParseRequest) returns (FetchAndParseReply) {}
+  rpc FetchAndParseServerSideStreaming(FetchAndParseRequest) returns (stream FetchAndParseReply) {}
+  rpc FetchAndParseBiDirectionalStreaming(stream FetchAndParseRequest) returns (stream FetchAndParseReply) {}
 }
 
 message CreateFetcherRequest {
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
index d74c7d23b..32ad81326 100644
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
@@ -1,9 +1,10 @@
 /*
- * Copyright 2015 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,6 +16,7 @@
  */
 package org.apache.tika.pipes.grpc;
 
+import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
@@ -68,6 +70,19 @@ public class TikaClient {
         logger.info("Fetch reply - tika parsed metadata: " + fetchReply.getFieldsMap());
     }
 
+    public void fetchAndParseServerSideStreaming(FetchAndParseRequest fetchAndParseRequest) {
+        Iterator<FetchAndParseReply> fetchReply;
+        try {
+            fetchReply = blockingStub.fetchAndParseServerSideStreaming(fetchAndParseRequest);
+        } catch (StatusRuntimeException e) {
+            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+            return;
+        }
+        while (fetchReply.hasNext()) {
+            logger.info("Fetch reply - tika parsed metadata: " + fetchReply.next());
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         if (args.length != 1) {
             System.err.println(
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
index 165185ab9..82f7aa038 100644
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -1,9 +1,10 @@
 /*
- * Copyright 2016 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,10 +16,19 @@
  */
 package org.apache.tika.pipes.grpc;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.File;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 import com.asarkar.grpc.test.GrpcCleanupExtension;
 import com.asarkar.grpc.test.Resources;
@@ -26,16 +36,25 @@ import io.grpc.ManagedChannel;
 import io.grpc.Server;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.tika.CreateFetcherReply;
 import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
 import org.apache.tika.TikaGrpc;
 import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
 
 @ExtendWith(GrpcCleanupExtension.class)
 public class TikaGrpcServerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(TikaGrpcServerTest.class);
+    public static final int NUM_TEST_DOCS = 50;
 
     @Test
     public void testTikaCreateFetcher(Resources resources) throws Exception {
@@ -60,6 +79,77 @@ public class TikaGrpcServerTest {
                 .putParams("basePath", targetFolder).putParams("extractFileSystemMetadata", "true")
                 .build());
 
-        assertEquals(fetcherId, reply.getMessage());
+        Assertions.assertEquals(fetcherId, reply.getMessage());
+    }
+
+    @Test
+    public void testBiStream(Resources resources) throws Exception {
+        // Generate a unique in-process server name.
+        String serverName = InProcessServerBuilder.generateName();
+
+        // Create a server, add service, start, and register for automatic graceful shutdown.
+        Server server = InProcessServerBuilder
+                .forName(serverName)
+                .directExecutor()
+                .addService(new TikaGrpcServerImpl("tika-config.xml")).build().start();
+        resources.register(server, Duration.ofSeconds(10));
+
+        ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
+        resources.register(channel, Duration.ofSeconds(10));
+        TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(channel);
+        TikaGrpc.TikaStub tikaStub = TikaGrpc.newStub(channel);
+
+        String fetcherId = "fetcherIdHere";
+        String targetFolder = new File("target").getAbsolutePath();
+        CreateFetcherReply reply = blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(fetcherId)
+                .setFetcherClass(FileSystemFetcher.class.getName())
+                .putParams("basePath", targetFolder).putParams("extractFileSystemMetadata", "true")
+                .build());
+
+        Assertions.assertEquals(fetcherId, reply.getMessage());
+
+        List<FetchAndParseReply> fetchAndParseReplys =
+                Collections.synchronizedList(new ArrayList<>());
+
+        StreamObserver<FetchAndParseReply> replyStreamObserver = new StreamObserver<>() {
+            @Override
+            public void onNext(FetchAndParseReply fetchAndParseReply) {
+                LOG.info("Fetched {} with metadata {}", fetchAndParseReply.getFetchKey(),
+                        fetchAndParseReply.getFieldsMap());
+                fetchAndParseReplys.add(fetchAndParseReply);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOG.error("Fetched error found", throwable);
+            }
+
+            @Override
+            public void onCompleted() {
+                LOG.info("Stream completed");
+            }
+        };
+
+        StreamObserver<FetchAndParseRequest> requestStreamObserver =
+                tikaStub.fetchAndParseBiDirectionalStreaming(replyStreamObserver);
+
+        File testDocumentFolder = new File("target/" + DateTimeFormatter.ofPattern(
+                "yyyy_mm_dd_HH_MM_ssSSS").format(LocalDateTime.now()) + "-" + UUID.randomUUID());
+        assertTrue(testDocumentFolder.mkdir());
+        for (int i = 0; i < NUM_TEST_DOCS; ++i) {
+            File testFile = new File(testDocumentFolder, "test-" + i + ".html");
+            FileUtils.writeStringToFile(testFile, "<html><body>test " + i + "</body></html>", StandardCharsets.UTF_8);
+        }
+        File[] testDocuments = testDocumentFolder.listFiles();
+        assertNotNull(testDocuments);
+        for (File testDocument : testDocuments) {
+            requestStreamObserver.onNext(FetchAndParseRequest.newBuilder()
+                            .setFetcherName(fetcherId)
+                            .setFetchKey(testDocument.getAbsolutePath())
+                    .build());
+        }
+        requestStreamObserver.onCompleted();
+
+        assertEquals(NUM_TEST_DOCS, fetchAndParseReplys.size());
     }
 }
diff --git a/tika-pipes/tika-grpc/src/test/resources/log4j2.xml b/tika-pipes/tika-grpc/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..c88e66e99
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_ERR">
+      <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file