You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2020/03/22 15:40:57 UTC

[arrow] branch master updated: ARROW-8181: [Java][FlightRPC] Expose transport error metadata

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 00266d1  ARROW-8181: [Java][FlightRPC] Expose transport error metadata
00266d1 is described below

commit 00266d1c5ccc85020fbeb24b07670476a43023d7
Author: Ryan Murray <ry...@dremio.com>
AuthorDate: Sun Mar 22 11:40:22 2020 -0400

    ARROW-8181: [Java][FlightRPC] Expose transport error metadata
    
    Add a class to act as a container for metadata (akin to gRPC `Metadata`) and expose this metatada on `FlightRuntimeException`
    
    Closes #6682 from rymurr/ARROW-8181 and squashes the following commits:
    
    3fbcd2da2 <Ryan Murray> unify `FlightMetadata` and `CallHeaders`
    85202263d <Ryan Murray> address code review from @lidavidm
    eba658c1b <Ryan Murray> expose transport error metadata
    
    Authored-by: Ryan Murray <ry...@dremio.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 java/flight/flight-core/pom.xml                    |  7 ++
 .../java/org/apache/arrow/flight/CallHeaders.java  | 37 ++++++---
 .../java/org/apache/arrow/flight/CallStatus.java   | 27 +++++--
 ...tadataAdapter.java => ErrorFlightMetadata.java} | 59 ++++++++------
 .../flight/grpc/ClientInterceptorAdapter.java      |  4 +-
 .../apache/arrow/flight/grpc/MetadataAdapter.java  | 23 ++++++
 .../org/apache/arrow/flight/grpc/StatusUtils.java  | 35 +++++++-
 .../org/apache/arrow/flight/TestErrorMetadata.java | 92 ++++++++++++++++++++++
 8 files changed, 240 insertions(+), 44 deletions(-)

diff --git a/java/flight/flight-core/pom.xml b/java/flight/flight-core/pom.xml
index fb92467..d53bd8e 100644
--- a/java/flight/flight-core/pom.xml
+++ b/java/flight/flight-core/pom.xml
@@ -125,6 +125,13 @@
       <groupId>javax.annotation</groupId>
       <artifactId>javax.annotation-api</artifactId>
      </dependency>
+
+    <dependency>
+      <groupId>com.google.api.grpc</groupId>
+      <artifactId>proto-google-common-protos</artifactId>
+      <version>1.12.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <extensions>
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java
index 3a47be1..5f272b6 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallHeaders.java
@@ -20,32 +20,49 @@ package org.apache.arrow.flight;
 import java.util.Set;
 
 /**
- * A set of headers for a call (request or response).
- *
- * <p>Only text (ASCII) headers are supported.
+ * A set of metadata key value pairs for a call (request or response).
  */
 public interface CallHeaders {
-
   /**
-   * Get the value of a header. If multiple values are present, then get the last one.
+   * Get the value of a metadata key. If multiple values are present, then get the last one.
    */
+  @Deprecated()
   String get(String key);
 
   /**
-   * Get all values present for the given header.
+   * Get the value of a metadata key. If multiple values are present, then get the last one.
+   */
+  byte[] getByte(String key);
+
+  /**
+   * Get all values present for the given metadata key.
    */
+  @Deprecated
   Iterable<String> getAll(String key);
 
   /**
-   * Insert a header with the given value.
+   * Get all values present for the given metadata key.
+   */
+  Iterable<byte[]> getAllByte(String key);
+
+  /**
+   * Insert a metadata pair with the given value.
    *
-   * <p>Duplicate headers are permitted.
+   * <p>Duplicate metadata are permitted.
    */
+  @Deprecated
   void insert(String key, String value);
 
-  /** Get a set of all the headers. */
+  /**
+   * Insert a metadata pair with the given value.
+   *
+   * <p>Duplicate metadata are permitted.
+   */
+  void insert(String key, byte[] value);
+
+  /** Get a set of all the metadata keys. */
   Set<String> keys();
 
-  /** Check whether the given header is present. */
+  /** Check whether the given metadata key is present. */
   boolean containsKey(String key);
 }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
index c1d0c2a..5017317 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
@@ -36,6 +36,7 @@ public class CallStatus {
   private final FlightStatusCode code;
   private final Throwable cause;
   private final String description;
+  private final ErrorFlightMetadata metadata;
 
   public static final CallStatus UNKNOWN = FlightStatusCode.UNKNOWN.toStatus();
   public static final CallStatus INTERNAL = FlightStatusCode.INTERNAL.toStatus();
@@ -56,10 +57,11 @@ public class CallStatus {
    * @param cause An exception that resulted in this status (or null).
    * @param description A description of the status (or null).
    */
-  public CallStatus(FlightStatusCode code, Throwable cause, String description) {
+  public CallStatus(FlightStatusCode code, Throwable cause, String description, ErrorFlightMetadata metadata) {
     this.code = Objects.requireNonNull(code);
     this.cause = cause;
     this.description = description == null ? "" : description;
+    this.metadata = metadata == null ? new ErrorFlightMetadata() : metadata;
   }
 
   /**
@@ -68,7 +70,7 @@ public class CallStatus {
    * @param code The status code.
    */
   public CallStatus(FlightStatusCode code) {
-    this(code, /* no cause */ null, /* no description */ null);
+    this(code, /* no cause */ null, /* no description */ null, /* no metadata */ null);
   }
 
   /**
@@ -93,17 +95,31 @@ public class CallStatus {
   }
 
   /**
+   * Metadata associated with the exception.
+   */
+  public ErrorFlightMetadata metadata() {
+    return metadata;
+  }
+
+  /**
    * Return a copy of this status with an error message.
    */
   public CallStatus withDescription(String message) {
-    return new CallStatus(code, cause, message);
+    return new CallStatus(code, cause, message, metadata);
   }
 
   /**
    * Return a copy of this status with the given exception as the cause. This will not be sent over the wire.
    */
   public CallStatus withCause(Throwable t) {
-    return new CallStatus(code, t, description);
+    return new CallStatus(code, t, description, metadata);
+  }
+
+  /**
+   * Return a copy of this status with associated exception metadata.
+   */
+  public CallStatus withMetadata(ErrorFlightMetadata metadata) {
+    return new CallStatus(code, cause, description, metadata);
   }
 
   /**
@@ -118,7 +134,8 @@ public class CallStatus {
     return "CallStatus{" +
         "code=" + code +
         ", cause=" + cause +
-        ", description='" + description + '\'' +
+        ", description='" + description +
+        ", metadata='" + metadata + '\'' +
         '}';
   }
 }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ErrorFlightMetadata.java
similarity index 50%
copy from java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
copy to java/flight/flight-core/src/main/java/org/apache/arrow/flight/ErrorFlightMetadata.java
index 04d0cf8..6669ce4 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ErrorFlightMetadata.java
@@ -15,58 +15,67 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.flight.grpc;
+package org.apache.arrow.flight;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
-import org.apache.arrow.flight.CallHeaders;
-
-import io.grpc.Metadata;
-import io.grpc.Metadata.Key;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
 
 /**
- * A mutable adapter between the gRPC Metadata object and the Flight headers interface.
- *
- * <p>This allows us to present the headers (metadata) from gRPC without copying to/from our own object.
+ * metadata container specific to the binary metadata held in the grpc trailer.
  */
-class MetadataAdapter implements CallHeaders {
+public class ErrorFlightMetadata implements CallHeaders {
+  private final Multimap<String, byte[]> metadata = LinkedListMultimap.create();
 
-  private final Metadata metadata;
-
-  MetadataAdapter(Metadata metadata) {
-    this.metadata = metadata;
+  public ErrorFlightMetadata() {
   }
 
+
   @Override
   public String get(String key) {
-    return this.metadata.get(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
+    return new String(getByte(key), StandardCharsets.US_ASCII);
+  }
+
+  @Override
+  public byte[] getByte(String key) {
+    return Iterables.getLast(metadata.get(key));
   }
 
   @Override
   public Iterable<String> getAll(String key) {
-    return this.metadata.getAll(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
+    return StreamSupport.stream(
+        getAllByte(key).spliterator(), false)
+        .map(b -> new String(b, StandardCharsets.US_ASCII))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public Iterable<byte[]> getAllByte(String key) {
+    return metadata.get(key);
   }
 
   @Override
   public void insert(String key, String value) {
-    this.metadata.put(Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
+    metadata.put(key, value.getBytes());
   }
 
   @Override
-  public Set<String> keys() {
-    // Remove binary keys - we don't expose those
-    return this.metadata.keys().stream().filter(key -> !key.endsWith(Metadata.BINARY_HEADER_SUFFIX))
-        .collect(Collectors.toSet());
+  public void insert(String key, byte[] value) {
+    metadata.put(key, value);
   }
 
   @Override
-  public boolean containsKey(String key) {
-    final Key<?> grpcKey = Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
-    return this.metadata.containsKey(grpcKey);
+  public Set<String> keys() {
+    return metadata.keySet();
   }
 
-  public String toString() {
-    return this.metadata.toString();
+  @Override
+  public boolean containsKey(String key) {
+    return metadata.containsKey(key);
   }
 }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java
index 74abead..ae11e52 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ClientInterceptorAdapter.java
@@ -110,7 +110,7 @@ public class ClientInterceptorAdapter implements ClientInterceptor {
           final MetadataAdapter adapter = new MetadataAdapter(trailers);
           middleware.forEach(m -> m.onHeadersReceived(adapter));
         }
-        final CallStatus flightStatus = StatusUtils.fromGrpcStatus(status);
+        final CallStatus flightStatus = StatusUtils.fromGrpcStatusAndTrailers(status, trailers);
         middleware.forEach(m -> m.onCallCompleted(flightStatus));
       } finally {
         // Make sure to always call the gRPC callback to avoid interrupting the gRPC request cycle
@@ -141,7 +141,7 @@ public class ClientInterceptorAdapter implements ClientInterceptor {
 
     @Override
     public void cancel(String message, Throwable cause) {
-      final CallStatus flightStatus = new CallStatus(FlightStatusCode.CANCELLED, cause, message);
+      final CallStatus flightStatus = new CallStatus(FlightStatusCode.CANCELLED, cause, message, null);
       middleware.forEach(m -> m.onCallCompleted(flightStatus));
       super.cancel(message, cause);
     }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
index 04d0cf8..5f127ac 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/MetadataAdapter.java
@@ -19,6 +19,7 @@ package org.apache.arrow.flight.grpc;
 
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import org.apache.arrow.flight.CallHeaders;
 
@@ -44,16 +45,38 @@ class MetadataAdapter implements CallHeaders {
   }
 
   @Override
+  public byte[] getByte(String key) {
+    if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+      return this.metadata.get(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
+    }
+    return get(key).getBytes();
+  }
+
+  @Override
   public Iterable<String> getAll(String key) {
     return this.metadata.getAll(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
   }
 
   @Override
+  public Iterable<byte[]> getAllByte(String key) {
+    if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+      return this.metadata.getAll(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
+    }
+    return StreamSupport.stream(getAll(key).spliterator(), false)
+        .map(String::getBytes).collect(Collectors.toList());
+  }
+
+  @Override
   public void insert(String key, String value) {
     this.metadata.put(Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
   }
 
   @Override
+  public void insert(String key, byte[] value) {
+    this.metadata.put(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER), value);
+  }
+
+  @Override
   public Set<String> keys() {
     // Remove binary keys - we don't expose those
     return this.metadata.keys().stream().filter(key -> !key.endsWith(Metadata.BINARY_HEADER_SUFFIX))
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java
index 36f2be4..7ff0e1b 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/StatusUtils.java
@@ -22,9 +22,11 @@ import java.util.Objects;
 import java.util.function.Function;
 
 import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.ErrorFlightMetadata;
 import org.apache.arrow.flight.FlightRuntimeException;
 import org.apache.arrow.flight.FlightStatusCode;
 
+import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.StatusException;
@@ -119,9 +121,22 @@ public class StatusUtils {
     }
   }
 
+  /** Convert from a gRPC Status & trailers to a Flight status. */
+  public static CallStatus fromGrpcStatusAndTrailers(Status status, Metadata trailers) {
+    return new CallStatus(
+              fromGrpcStatusCode(status.getCode()),
+              status.getCause(),
+              status.getDescription(),
+              parseTrailers(trailers));
+  }
+
   /** Convert from a gRPC status to a Flight status. */
   public static CallStatus fromGrpcStatus(Status status) {
-    return new CallStatus(fromGrpcStatusCode(status.getCode()), status.getCause(), status.getDescription());
+    return new CallStatus(
+              fromGrpcStatusCode(status.getCode()),
+              status.getCause(),
+              status.getDescription(),
+              null);
   }
 
   /** Convert from a Flight status to a gRPC status. */
@@ -131,7 +146,23 @@ public class StatusUtils {
 
   /** Convert from a gRPC exception to a Flight exception. */
   public static FlightRuntimeException fromGrpcRuntimeException(StatusRuntimeException sre) {
-    return fromGrpcStatus(sre.getStatus()).toRuntimeException();
+    return fromGrpcStatusAndTrailers(sre.getStatus(), sre.getTrailers()).toRuntimeException();
+  }
+
+
+  private static ErrorFlightMetadata parseTrailers(Metadata trailers) {
+    ErrorFlightMetadata metadata = new ErrorFlightMetadata();
+    for (String key : trailers.keys()) {
+      if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+        metadata.insert(key, trailers.get(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER)));
+      } else {
+        metadata.insert(key,
+                     Objects.requireNonNull(
+                     trailers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))).getBytes()
+        );
+      }
+    }
+    return metadata;
   }
 
   /**
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
new file mode 100644
index 0000000..b6d344f
--- /dev/null
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.flight.perf.impl.PerfOuterClass;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.rpc.Status;
+
+import io.grpc.Metadata;
+import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.ProtoUtils;
+import io.grpc.protobuf.StatusProto;
+
+public class TestErrorMetadata {
+  private static final Metadata.BinaryMarshaller<Status> marshaller =
+          ProtoUtils.metadataMarshaller(Status.getDefaultInstance());
+
+  @Test
+  public void testMetadata() throws Exception {
+    PerfOuterClass.Perf perf = PerfOuterClass.Perf.newBuilder()
+                .setStreamCount(12)
+                .setRecordsPerBatch(1000)
+                .setRecordsPerStream(1000000L)
+                .build();
+    try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+        final FlightServer s =
+             FlightTestUtil.getStartedServer(
+               (location) -> FlightServer.builder(allocator, location, new TestFlightProducer(perf)).build());
+          final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
+      FlightStream stream = client.getStream(new Ticket("abs".getBytes()));
+      stream.next();
+      Assert.fail();
+    } catch (FlightRuntimeException fre) {
+      PerfOuterClass.Perf newPerf = null;
+      ErrorFlightMetadata metadata = fre.status().metadata();
+      Assert.assertNotNull(metadata);
+      Assert.assertEquals(2, metadata.keys().size());
+      Assert.assertTrue(metadata.containsKey("grpc-status-details-bin"));
+      Status status = marshaller.parseBytes(metadata.getByte("grpc-status-details-bin"));
+      for (Any details : status.getDetailsList()) {
+        if (details.is(PerfOuterClass.Perf.class)) {
+          try {
+            newPerf = details.unpack(PerfOuterClass.Perf.class);
+          } catch (InvalidProtocolBufferException e) {
+            Assert.fail();
+          }
+        }
+      }
+      Assert.assertNotNull(newPerf);
+      Assert.assertEquals(perf, newPerf);
+    }
+  }
+
+  private static class TestFlightProducer extends NoOpFlightProducer {
+    private final PerfOuterClass.Perf perf;
+
+    private TestFlightProducer(PerfOuterClass.Perf perf) {
+      this.perf = perf;
+    }
+
+    @Override
+    public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
+      StatusRuntimeException sre = StatusProto.toStatusRuntimeException(Status.newBuilder()
+              .setCode(1)
+              .setMessage("Testing 1 2 3")
+              .addDetails(Any.pack(perf, "arrow/meta/types"))
+              .build());
+      listener.error(sre);
+    }
+  }
+}