You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/03/29 09:57:35 UTC
(tika) branch TIKA-4181-grpc updated: add bidi streaming
This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-4181-grpc by this push:
new cd397efe9 add bidi streaming
cd397efe9 is described below
commit cd397efe9f617688054df5f63e8f68b164a7c321
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Fri Mar 29 04:57:29 2024 -0500
add bidi streaming
---
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 120 +++++++++++++--------
tika-pipes/tika-grpc/src/main/proto/tika.proto | 2 +
.../org/apache/tika/pipes/grpc/TikaClient.java | 25 ++++-
.../apache/tika/pipes/grpc/TikaGrpcServerTest.java | 104 ++++++++++++++++--
tika-pipes/tika-grpc/src/test/resources/log4j2.xml | 32 ++++++
5 files changed, 225 insertions(+), 58 deletions(-)
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index f162e235b..c8e567460 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -36,6 +36,8 @@ import javax.xml.transform.stream.StreamResult;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
@@ -55,6 +57,7 @@ import org.apache.tika.UpdateFetcherReply;
import org.apache.tika.UpdateFetcherRequest;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.Param;
+import org.apache.tika.config.TikaConfigSerializer;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
@@ -67,6 +70,7 @@ import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.config.AbstractConfig;
class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TikaConfigSerializer.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* FetcherID is key, The pair is the Fetcher object and the Metadata
@@ -125,6 +129,74 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
}
}
+ @Override
+ public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
+ StreamObserver<FetchAndParseReply> responseObserver) {
+ fetchAndParseImpl(request, responseObserver);
+ }
+
+ @Override
+ public StreamObserver<FetchAndParseRequest> fetchAndParseBiDirectionalStreaming(
+ StreamObserver<FetchAndParseReply> responseObserver) {
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(FetchAndParseRequest fetchAndParseRequest) {
+ fetchAndParseImpl(fetchAndParseRequest, responseObserver);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.error("Parse error occurred", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
+ }
+
+ @Override
+ public void fetchAndParse(FetchAndParseRequest request,
+ StreamObserver<FetchAndParseReply> responseObserver) {
+ fetchAndParseImpl(request, responseObserver);
+ responseObserver.onCompleted();
+ }
+
+
+ private void fetchAndParseImpl(FetchAndParseRequest request,
+ StreamObserver<FetchAndParseReply> responseObserver) {
+ AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+ if (fetcher == null) {
+ throw new RuntimeException(
+ "Could not find fetcher with name " + request.getFetcherName());
+ }
+ Metadata tikaMetadata = new Metadata();
+ for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
+ tikaMetadata.add(entry.getKey(), entry.getValue());
+ }
+ try {
+ PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
+ new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(),
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
+ FetchAndParseReply.Builder fetchReplyBuilder =
+ FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
+ for (String name : metadata.names()) {
+ String value = metadata.get(name);
+ if (value != null) {
+ fetchReplyBuilder.putFields(name, value);
+ }
+ }
+ responseObserver.onNext(fetchReplyBuilder.build());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
@SuppressWarnings("raw")
@Override
public void createFetcher(CreateFetcherRequest request,
@@ -163,17 +235,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
}
fetchers.put(name, abstractFetcher);
fetcherConfigs.put(name, configObject);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException(e);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- } catch (TikaConfigException e) {
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
+ InvocationTargetException | NoSuchMethodException | TikaConfigException e) {
throw new RuntimeException(e);
}
}
@@ -186,41 +249,6 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
return tikaParamsMap;
}
- @Override
- public void fetchAndParse(FetchAndParseRequest request,
- StreamObserver<FetchAndParseReply> responseObserver) {
- AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
- if (fetcher == null) {
- throw new RuntimeException(
- "Could not find fetcher with name " + request.getFetcherName());
- }
- Metadata tikaMetadata = new Metadata();
- for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
- tikaMetadata.add(entry.getKey(), entry.getValue());
- }
- try {
- PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
- new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(),
- FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
- for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
- FetchAndParseReply.Builder fetchReplyBuilder =
- FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
- for (String name : metadata.names()) {
- String value = metadata.get(name);
- if (value != null) {
- fetchReplyBuilder.putFields(name, value);
- }
- }
- responseObserver.onNext(fetchReplyBuilder.build());
- }
- responseObserver.onCompleted();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
@Override
public void updateFetcher(UpdateFetcherRequest request,
StreamObserver<UpdateFetcherReply> responseObserver) {
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
index 3095a0779..3c25ec7dd 100644
--- a/tika-pipes/tika-grpc/src/main/proto/tika.proto
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -27,6 +27,8 @@ service Tika {
rpc ListFetchers(ListFetchersRequest) returns (ListFetchersReply) {}
rpc DeleteFetcher(DeleteFetcherRequest) returns (DeleteFetcherReply) {}
rpc FetchAndParse(FetchAndParseRequest) returns (FetchAndParseReply) {}
+ rpc FetchAndParseServerSideStreaming(FetchAndParseRequest) returns (stream FetchAndParseReply) {}
+ rpc FetchAndParseBiDirectionalStreaming(stream FetchAndParseRequest) returns (stream FetchAndParseReply) {}
}
message CreateFetcherRequest {
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
index d74c7d23b..32ad81326 100644
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
@@ -1,9 +1,10 @@
/*
- * Copyright 2015 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,6 +16,7 @@
*/
package org.apache.tika.pipes.grpc;
+import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -68,6 +70,19 @@ public class TikaClient {
logger.info("Fetch reply - tika parsed metadata: " + fetchReply.getFieldsMap());
}
+ public void fetchAndParseServerSideStreaming(FetchAndParseRequest fetchAndParseRequest) {
+ Iterator<FetchAndParseReply> fetchReply;
+ try {
+ fetchReply = blockingStub.fetchAndParseServerSideStreaming(fetchAndParseRequest);
+ } catch (StatusRuntimeException e) {
+ logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+ return;
+ }
+ while (fetchReply.hasNext()) {
+ logger.info("Fetch reply - tika parsed metadata: " + fetchReply.next());
+ }
+ }
+
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println(
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
index 165185ab9..82f7aa038 100644
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -1,9 +1,10 @@
/*
- * Copyright 2016 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,10 +16,19 @@
*/
package org.apache.tika.pipes.grpc;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
import com.asarkar.grpc.test.GrpcCleanupExtension;
import com.asarkar.grpc.test.Resources;
@@ -26,16 +36,25 @@ import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tika.CreateFetcherReply;
import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
import org.apache.tika.TikaGrpc;
import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
@ExtendWith(GrpcCleanupExtension.class)
public class TikaGrpcServerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TikaGrpcServerTest.class);
+ public static final int NUM_TEST_DOCS = 50;
@Test
public void testTikaCreateFetcher(Resources resources) throws Exception {
@@ -60,6 +79,77 @@ public class TikaGrpcServerTest {
.putParams("basePath", targetFolder).putParams("extractFileSystemMetadata", "true")
.build());
- assertEquals(fetcherId, reply.getMessage());
+ Assertions.assertEquals(fetcherId, reply.getMessage());
+ }
+
+ @Test
+ public void testBiStream(Resources resources) throws Exception {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ Server server = InProcessServerBuilder
+ .forName(serverName)
+ .directExecutor()
+ .addService(new TikaGrpcServerImpl("tika-config.xml")).build().start();
+ resources.register(server, Duration.ofSeconds(10));
+
+ ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
+ resources.register(channel, Duration.ofSeconds(10));
+ TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(channel);
+ TikaGrpc.TikaStub tikaStub = TikaGrpc.newStub(channel);
+
+ String fetcherId = "fetcherIdHere";
+ String targetFolder = new File("target").getAbsolutePath();
+ CreateFetcherReply reply = blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(fetcherId)
+ .setFetcherClass(FileSystemFetcher.class.getName())
+ .putParams("basePath", targetFolder).putParams("extractFileSystemMetadata", "true")
+ .build());
+
+ Assertions.assertEquals(fetcherId, reply.getMessage());
+
+ List<FetchAndParseReply> fetchAndParseReplys =
+ Collections.synchronizedList(new ArrayList<>());
+
+ StreamObserver<FetchAndParseReply> replyStreamObserver = new StreamObserver<>() {
+ @Override
+ public void onNext(FetchAndParseReply fetchAndParseReply) {
+ LOG.info("Fetched {} with metadata {}", fetchAndParseReply.getFetchKey(),
+ fetchAndParseReply.getFieldsMap());
+ fetchAndParseReplys.add(fetchAndParseReply);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.error("Fetched error found", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("Stream completed");
+ }
+ };
+
+ StreamObserver<FetchAndParseRequest> requestStreamObserver =
+ tikaStub.fetchAndParseBiDirectionalStreaming(replyStreamObserver);
+
+ File testDocumentFolder = new File("target/" + DateTimeFormatter.ofPattern(
+ "yyyy_mm_dd_HH_MM_ssSSS").format(LocalDateTime.now()) + "-" + UUID.randomUUID());
+ assertTrue(testDocumentFolder.mkdir());
+ for (int i = 0; i < NUM_TEST_DOCS; ++i) {
+ File testFile = new File(testDocumentFolder, "test-" + i + ".html");
+ FileUtils.writeStringToFile(testFile, "<html><body>test " + i + "</body></html>", StandardCharsets.UTF_8);
+ }
+ File[] testDocuments = testDocumentFolder.listFiles();
+ assertNotNull(testDocuments);
+ for (File testDocument : testDocuments) {
+ requestStreamObserver.onNext(FetchAndParseRequest.newBuilder()
+ .setFetcherName(fetcherId)
+ .setFetchKey(testDocument.getAbsolutePath())
+ .build());
+ }
+ requestStreamObserver.onCompleted();
+
+ assertEquals(NUM_TEST_DOCS, fetchAndParseReplys.size());
}
}
diff --git a/tika-pipes/tika-grpc/src/test/resources/log4j2.xml b/tika-pipes/tika-grpc/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..c88e66e99
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_ERR">
+ <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file