You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2020/03/22 15:40:57 UTC
[arrow] branch master updated: ARROW-8181: [Java][FlightRPC] Expose
transport error metadata
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 00266d1 ARROW-8181: [Java][FlightRPC] Expose transport error metadata
00266d1 is described below
commit 00266d1c5ccc85020fbeb24b07670476a43023d7
Author: Ryan Murray <ry...@dremio.com>
AuthorDate: Sun Mar 22 11:40:22 2020 -0400
ARROW-8181: [Java][FlightRPC] Expose transport error metadata
Add a class to act as a container for metadata (akin to gRPC `Metadata`) and expose this metatada on `FlightRuntimeException`
Closes #6682 from rymurr/ARROW-8181 and squashes the following commits:
3fbcd2da2 <Ryan Murray> unify `FlightMetadata` and `CallHeaders`
85202263d <Ryan Murray> address code review from @lidavidm
eba658c1b <Ryan Murray> expose transport error metadata
Authored-by: Ryan Murray <ry...@dremio.com>
Signed-off-by: David Li <li...@gmail.com>
---
java/flight/flight-core/pom.xml | 7 ++
.../java/org/apache/arrow/flight/CallHeaders.java | 37 ++++++---
.../java/org/apache/arrow/flight/CallStatus.java | 27 +++++--
...tadataAdapter.java => ErrorFlightMetadata.java} | 59 ++++++++------
.../flight/grpc/ClientInterceptorAdapter.java | 4 +-
.../apache/arrow/flight/grpc/MetadataAdapter.java | 23 ++++++
.../org/apache/arrow/flight/grpc/StatusUtils.java | 35 +++++++-
.../org/apache/arrow/flight/TestErrorMetadata.java | 92 ++++++++++++++++++++++
8 files changed, 240 insertions(+), 44 deletions(-)
diff --git a/java/flight/flight-core/pom.xml b/java/flight/flight-core/pom.xml
index fb92467..d53bd8e 100644
--- a/java/flight/flight-core/pom.xml
+++ b/java/flight/flight-core/pom.xml
@@ -125,6 +125,13 @@
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.google.api.grpc</groupId>
+ <artifactId>proto-google-common-protos</artifactId>
+ <version>1.12.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<extensions>
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java
index 3a47be1..5f272b6 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java
@@ -20,32 +20,49 @@ package org.apache.arrow.flight;
import java.util.Set;
/**
- * A set of headers for a call (request or response).
- *
- * <p>Only text (ASCII) headers are supported.
+ * A set of metadata key value pairs for a call (request or response).
*/
public interface CallHeaders {
-
/**
- * Get the value of a header. If multiple values are present, then get the last one.
+ * Get the value of a metadata key. If multiple values are present, then get the last one.
*/
+ @Deprecated()
String get(String key);
/**
- * Get all values present for the given header.
+ * Get the value of a metadata key. If multiple values are present, then get the last one.
+ */
+ byte[] getByte(String key);
+
+ /**
+ * Get all values present for the given metadata key.
*/
+ @Deprecated
Iterable<String> getAll(String key);
/**
- * Insert a header with the given value.
+ * Get all values present for the given metadata key.
+ */
+ Iterable<byte[]> getAllByte(String key);
+
+ /**
+ * Insert a metadata pair with the given value.
*
- * <p>Duplicate headers are permitted.
+ * <p>Duplicate metadata are permitted.
*/
+ @Deprecated
void insert(String key, String value);
- /** Get a set of all the headers. */
+ /**
+ * Insert a metadata pair with the given value.
+ *
+ * <p>Duplicate metadata are permitted.
+ */
+ void insert(String key, byte[] value);
+
+ /** Get a set of all the metadata keys. */
Set<String> keys();
- /** Check whether the given header is present. */
+ /** Check whether the given metadata key is present. */
boolean containsKey(String key);
}
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
index c1d0c2a..5017317 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
@@ -36,6 +36,7 @@ public class CallStatus {
private final FlightStatusCode code;
private final Throwable cause;
private final String description;
+ private final ErrorFlightMetadata metadata;
public static final CallStatus UNKNOWN = FlightStatusCode.UNKNOWN.toStatus();
public static final CallStatus INTERNAL = FlightStatusCode.INTERNAL.toStatus();
@@ -56,10 +57,11 @@ public class CallStatus {
* @param cause An exception that resulted in this status (or null).
* @param description A description of the status (or null).
*/
- public CallStatus(FlightStatusCode code, Throwable cause, String description) {
+ public CallStatus(FlightStatusCode code, Throwable cause, String description, ErrorFlightMetadata metadata) {
this.code = Objects.requireNonNull(code);
this.cause = cause;
this.description = description == null ? "" : description;
+ this.metadata = metadata == null ? new ErrorFlightMetadata() : metadata;
}
/**
@@ -68,7 +70,7 @@ public class CallStatus {
* @param code The status code.
*/
public CallStatus(FlightStatusCode code) {
- this(code, /* no cause */ null, /* no description */ null);
+ this(code, /* no cause */ null, /* no description */ null, /* no metadata */ null);
}
/**
@@ -93,17 +95,31 @@ public class CallStatus {
}
/**
+ * Metadata associated with the exception.
+ */
+ public ErrorFlightMetadata metadata() {
+ return metadata;
+ }
+
+ /**
* Return a copy of this status with an error message.
*/
public CallStatus withDescription(String message) {
- return new CallStatus(code, cause, message);
+ return new CallStatus(code, cause, message, metadata);
}
/**
* Return a copy of this status with the given exception as the cause. This will not be sent over the wire.
*/
public CallStatus withCause(Throwable t) {
- return new CallStatus(code, t, description);
+ return new CallStatus(code, t, description, metadata);
+ }
+
+ /**
+ * Return a copy of this status with associated exception metadata.
+ */
+ public CallStatus withMetadata(ErrorFlightMetadata metadata) {
+ return new CallStatus(code, cause, description, metadata);
}
/**
@@ -118,7 +134,8 @@ public class CallStatus {
return "CallStatus{" +
"code=" + code +
", cause=" + cause +
- ", description='" + description + '\'' +
+ ", description='" + description +
+ ", metadata='" + metadata + '\'' +
'}';
}
}
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ErrorFlightMetadata.java
similarity index 50%
copy from java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
copy to java/flight/flight-core/src/main/java/org/apache/arrow/flight/ErrorFlightMetadata.java
index 04d0cf8..6669ce4 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ErrorFlightMetadata.java
@@ -15,58 +15,67 @@
* limitations under the License.
*/
-package org.apache.arrow.flight.grpc;
+package org.apache.arrow.flight;
+import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
-import org.apache.arrow.flight.CallHeaders;
-
-import io.grpc.Metadata;
-import io.grpc.Metadata.Key;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
/**
- * A mutable adapter between the gRPC Metadata object and the Flight headers interface.
- *
- * <p>This allows us to present the headers (metadata) from gRPC without copying to/from our own object.
+ * metadata container specific to the binary metadata held in the grpc trailer.
*/
-class MetadataAdapter implements CallHeaders {
+public class ErrorFlightMetadata implements CallHeaders {
+ private final Multimap<String, byte[]> metadata = LinkedListMultimap.create();
- private final Metadata metadata;
-
- MetadataAdapter(Metadata metadata) {
- this.metadata = metadata;
+ public ErrorFlightMetadata() {
}
+
@Override
public String get(String key) {
- return this.metadata.get(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
+ return new String(getByte(key), StandardCharsets.US_ASCII);
+ }
+
+ @Override
+ public byte[] getByte(String key) {
+ return Iterables.getLast(metadata.get(key));
}
@Override
public Iterable<String> getAll(String key) {
- return this.metadata.getAll(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
+ return StreamSupport.stream(
+ getAllByte(key).spliterator(), false)
+ .map(b -> new String(b, StandardCharsets.US_ASCII))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Iterable<byte[]> getAllByte(String key) {
+ return metadata.get(key);
}
@Override
public void insert(String key, String value) {
- this.metadata.put(Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
+ metadata.put(key, value.getBytes());
}
@Override
- public Set<String> keys() {
- // Remove binary keys - we don't expose those
- return this.metadata.keys().stream().filter(key -> !key.endsWith(Metadata.BINARY_HEADER_SUFFIX))
- .collect(Collectors.toSet());
+ public void insert(String key, byte[] value) {
+ metadata.put(key, value);
}
@Override
- public boolean containsKey(String key) {
- final Key<?> grpcKey = Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
- return this.metadata.containsKey(grpcKey);
+ public Set<String> keys() {
+ return metadata.keySet();
}
- public String toString() {
- return this.metadata.toString();
+ @Override
+ public boolean containsKey(String key) {
+ return metadata.containsKey(key);
}
}
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java
index 74abead..ae11e52 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java
@@ -110,7 +110,7 @@ public class ClientInterceptorAdapter implements ClientInterceptor {
final MetadataAdapter adapter = new MetadataAdapter(trailers);
middleware.forEach(m -> m.onHeadersReceived(adapter));
}
- final CallStatus flightStatus = StatusUtils.fromGrpcStatus(status);
+ final CallStatus flightStatus = StatusUtils.fromGrpcStatusAndTrailers(status, trailers);
middleware.forEach(m -> m.onCallCompleted(flightStatus));
} finally {
// Make sure to always call the gRPC callback to avoid interrupting the gRPC request cycle
@@ -141,7 +141,7 @@ public class ClientInterceptorAdapter implements ClientInterceptor {
@Override
public void cancel(String message, Throwable cause) {
- final CallStatus flightStatus = new CallStatus(FlightStatusCode.CANCELLED, cause, message);
+ final CallStatus flightStatus = new CallStatus(FlightStatusCode.CANCELLED, cause, message, null);
middleware.forEach(m -> m.onCallCompleted(flightStatus));
super.cancel(message, cause);
}
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
index 04d0cf8..5f127ac 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
@@ -19,6 +19,7 @@ package org.apache.arrow.flight.grpc;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.arrow.flight.CallHeaders;
@@ -44,16 +45,38 @@ class MetadataAdapter implements CallHeaders {
}
@Override
+ public byte[] getByte(String key) {
+ if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ return this.metadata.get(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
+ }
+ return get(key).getBytes();
+ }
+
+ @Override
public Iterable<String> getAll(String key) {
return this.metadata.getAll(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
}
@Override
+ public Iterable<byte[]> getAllByte(String key) {
+ if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ return this.metadata.getAll(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
+ }
+ return StreamSupport.stream(getAll(key).spliterator(), false)
+ .map(String::getBytes).collect(Collectors.toList());
+ }
+
+ @Override
public void insert(String key, String value) {
this.metadata.put(Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
}
@Override
+ public void insert(String key, byte[] value) {
+ this.metadata.put(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER), value);
+ }
+
+ @Override
public Set<String> keys() {
// Remove binary keys - we don't expose those
return this.metadata.keys().stream().filter(key -> !key.endsWith(Metadata.BINARY_HEADER_SUFFIX))
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java
index 36f2be4..7ff0e1b 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java
@@ -22,9 +22,11 @@ import java.util.Objects;
import java.util.function.Function;
import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.ErrorFlightMetadata;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
+import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
@@ -119,9 +121,22 @@ public class StatusUtils {
}
}
+ /** Convert from a gRPC Status & trailers to a Flight status. */
+ public static CallStatus fromGrpcStatusAndTrailers(Status status, Metadata trailers) {
+ return new CallStatus(
+ fromGrpcStatusCode(status.getCode()),
+ status.getCause(),
+ status.getDescription(),
+ parseTrailers(trailers));
+ }
+
/** Convert from a gRPC status to a Flight status. */
public static CallStatus fromGrpcStatus(Status status) {
- return new CallStatus(fromGrpcStatusCode(status.getCode()), status.getCause(), status.getDescription());
+ return new CallStatus(
+ fromGrpcStatusCode(status.getCode()),
+ status.getCause(),
+ status.getDescription(),
+ null);
}
/** Convert from a Flight status to a gRPC status. */
@@ -131,7 +146,23 @@ public class StatusUtils {
/** Convert from a gRPC exception to a Flight exception. */
public static FlightRuntimeException fromGrpcRuntimeException(StatusRuntimeException sre) {
- return fromGrpcStatus(sre.getStatus()).toRuntimeException();
+ return fromGrpcStatusAndTrailers(sre.getStatus(), sre.getTrailers()).toRuntimeException();
+ }
+
+
+ private static ErrorFlightMetadata parseTrailers(Metadata trailers) {
+ ErrorFlightMetadata metadata = new ErrorFlightMetadata();
+ for (String key : trailers.keys()) {
+ if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ metadata.insert(key, trailers.get(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER)));
+ } else {
+ metadata.insert(key,
+ Objects.requireNonNull(
+ trailers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))).getBytes()
+ );
+ }
+ }
+ return metadata;
}
/**
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
new file mode 100644
index 0000000..b6d344f
--- /dev/null
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.flight.perf.impl.PerfOuterClass;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.rpc.Status;
+
+import io.grpc.Metadata;
+import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.ProtoUtils;
+import io.grpc.protobuf.StatusProto;
+
+public class TestErrorMetadata {
+ private static final Metadata.BinaryMarshaller<Status> marshaller =
+ ProtoUtils.metadataMarshaller(Status.getDefaultInstance());
+
+ @Test
+ public void testMetadata() throws Exception {
+ PerfOuterClass.Perf perf = PerfOuterClass.Perf.newBuilder()
+ .setStreamCount(12)
+ .setRecordsPerBatch(1000)
+ .setRecordsPerStream(1000000L)
+ .build();
+ try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ final FlightServer s =
+ FlightTestUtil.getStartedServer(
+ (location) -> FlightServer.builder(allocator, location, new TestFlightProducer(perf)).build());
+ final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
+ FlightStream stream = client.getStream(new Ticket("abs".getBytes()));
+ stream.next();
+ Assert.fail();
+ } catch (FlightRuntimeException fre) {
+ PerfOuterClass.Perf newPerf = null;
+ ErrorFlightMetadata metadata = fre.status().metadata();
+ Assert.assertNotNull(metadata);
+ Assert.assertEquals(2, metadata.keys().size());
+ Assert.assertTrue(metadata.containsKey("grpc-status-details-bin"));
+ Status status = marshaller.parseBytes(metadata.getByte("grpc-status-details-bin"));
+ for (Any details : status.getDetailsList()) {
+ if (details.is(PerfOuterClass.Perf.class)) {
+ try {
+ newPerf = details.unpack(PerfOuterClass.Perf.class);
+ } catch (InvalidProtocolBufferException e) {
+ Assert.fail();
+ }
+ }
+ }
+ Assert.assertNotNull(newPerf);
+ Assert.assertEquals(perf, newPerf);
+ }
+ }
+
+ private static class TestFlightProducer extends NoOpFlightProducer {
+ private final PerfOuterClass.Perf perf;
+
+ private TestFlightProducer(PerfOuterClass.Perf perf) {
+ this.perf = perf;
+ }
+
+ @Override
+ public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
+ StatusRuntimeException sre = StatusProto.toStatusRuntimeException(Status.newBuilder()
+ .setCode(1)
+ .setMessage("Testing 1 2 3")
+ .addDetails(Any.pack(perf, "arrow/meta/types"))
+ .build());
+ listener.error(sre);
+ }
+ }
+}