You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/07/18 07:14:30 UTC

[GitHub] sijie closed pull request #1550: [TABLE SERVICE] Support getStreamById in root range store.

sijie closed pull request #1550: [TABLE SERVICE] Support getStreamById in root range store.
URL: https://github.com/apache/bookkeeper/pull/1550
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/stream/proto/src/main/proto/storage.proto b/stream/proto/src/main/proto/storage.proto
index e33ecbe48..664809097 100644
--- a/stream/proto/src/main/proto/storage.proto
+++ b/stream/proto/src/main/proto/storage.proto
@@ -35,6 +35,7 @@ enum StatusCode {
 
     // 4xx: client errors
     BAD_REQUEST                 = 400;
+    ILLEGAL_OP                  = 403;
 
     // 5xx: server errors
     INTERNAL_SERVER_ERROR       = 500;
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 4314bcec8..2f2c4048e 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -52,6 +52,7 @@
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest.IdCase;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -66,11 +67,12 @@
 
     private static final byte SYSTEM_TAG = (byte) 0xff;
     private static final byte NS_NAME_TAG = (byte) 0x01;
-    private static final byte NS_ID_TAG = (byte) 0x01;
+    private static final byte NS_ID_TAG = (byte) 0x02;
 
     // separator used for separating streams within a same namespace
     private static final byte NS_STREAM_NAME_SEP = (byte) 0x03;
     private static final byte NS_STREAM_ID_SEP = (byte) 0x04;
+    private static final byte STREAM_ID_TAG = (byte) 0x05;
     private static final byte NS_END_SEP = (byte) 0xff;
 
     static final byte[] NS_ID_KEY = new byte[]{SYSTEM_TAG, 'n', 's', 'i', 'd'};
@@ -133,6 +135,16 @@
         return streamIdBytes;
     }
 
+    /**
+     * stream id: [STREAM_ID_TAG][stream_id].
+     */
+    static final byte[] getStreamIdKey(long streamId) {
+        byte[] streamIdBytes = new byte[Long.BYTES + 1];
+        streamIdBytes[0] = STREAM_ID_TAG;
+        Bytes.toBytes(streamId, streamIdBytes, 1);
+        return streamIdBytes;
+    }
+
     private final URI defaultServiceUri;
     private final MVCCAsyncStore<byte[], byte[]> store;
     private final StorageContainerPlacementPolicy placementPolicy;
@@ -468,6 +480,8 @@ StatusCode verifyStreamRequest(String nsName, String streamName) {
         byte[] streamNameVal = Bytes.toBytes(streamId);
         byte[] streamIdKey = getStreamIdKey(nsId, streamId);
         byte[] streamIdVal = streamProps.toByteArray();
+        byte[] streamReverseIndexKey = getStreamIdKey(streamId);
+        byte[] streamReverseIndexValue = Bytes.toBytes(nsId);
 
         TxnOp<byte[], byte[]> txn = store.newTxn()
             .If(
@@ -480,6 +494,7 @@ StatusCode verifyStreamRequest(String nsName, String streamName) {
             .Then(
                 store.newPut(streamNameKey, streamNameVal),
                 store.newPut(streamIdKey, streamIdVal),
+                store.newPut(streamReverseIndexKey, streamReverseIndexValue),
                 store.newPut(STREAM_ID_KEY, Bytes.toBytes(streamId))
             )
             .build();
@@ -558,16 +573,19 @@ StatusCode verifyStreamRequest(String nsName, String streamName) {
         byte[] nsIdKey = getNamespaceIdKey(nsId);
         byte[] streamNameKey = getStreamNameKey(nsId, streamName);
         byte[] streamIdKey = getStreamIdKey(nsId, streamId);
+        byte[] streamReverseIndexKey = getStreamIdKey(streamId);
 
         TxnOp<byte[], byte[]> txnOp = store.newTxn()
             .If(
                 store.newCompareValue(CompareResult.NOT_EQUAL, nsIdKey, null),
                 store.newCompareValue(CompareResult.NOT_EQUAL, streamNameKey, null),
-                store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey, null)
+                store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey, null),
+                store.newCompareValue(CompareResult.NOT_EQUAL, streamReverseIndexKey, null)
             )
             .Then(
                 store.newDelete(streamIdKey),
-                store.newDelete(streamNameKey)
+                store.newDelete(streamNameKey),
+                store.newDelete(streamReverseIndexKey)
             )
             .build();
 
@@ -586,40 +604,74 @@ StatusCode verifyStreamRequest(String nsName, String streamName) {
         }).whenComplete((resp, cause) -> txnOp.close());
     }
 
+    private CompletableFuture<GetStreamResponse> streamPropertiesToResponse(
+        CompletableFuture<StreamProperties> propsFuture
+    ) {
+        GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
+        return propsFuture.thenCompose(streamProps -> {
+            if (null == streamProps) {
+                return FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
+            } else {
+                return FutureUtils.value(respBuilder
+                    .setCode(StatusCode.SUCCESS)
+                    .setStreamProps(streamProps)
+                    .build());
+            }
+        }).exceptionally(cause ->
+            respBuilder
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .build()
+        );
+    }
+
     @Override
     public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
-        StreamName streamName = request.getStreamName();
+        if (IdCase.STREAM_ID == request.getIdCase()) {
+            return streamPropertiesToResponse(
+                getStreamProps(request.getStreamId()));
+        } else if (IdCase.STREAM_NAME == request.getIdCase()) {
+            return getStreamProps(request.getStreamName());
+        } else {
+            return FutureUtils.value(GetStreamResponse.newBuilder()
+                .setCode(StatusCode.ILLEGAL_OP)
+                .build());
+        }
+    }
 
+    CompletableFuture<StreamProperties> getStreamProps(long streamId) {
+        byte[] streamReverseIndexKey = getStreamIdKey(streamId);
+
+        return store.get(streamReverseIndexKey).thenCompose(nsIdBytes -> {
+            if (null == nsIdBytes) {
+                return FutureUtils.value(null);
+            }
+
+            long nsId = Bytes.toLong(nsIdBytes, 0);
+            return getStreamProps(nsId, streamId);
+        });
+    }
+
+    CompletableFuture<GetStreamResponse> getStreamProps(StreamName streamName) {
         StatusCode code = verifyStreamRequest(
-            streamName.getNamespaceName(),
-            streamName.getStreamName());
+                streamName.getNamespaceName(),
+                streamName.getStreamName());
         if (StatusCode.SUCCESS != code) {
-            return FutureUtils.value(GetStreamResponse.newBuilder().setCode(code).build());
+            return FutureUtils.value(GetStreamResponse.newBuilder()
+                .setCode(code).build());
         }
 
         byte[] nsNameKey = getNamespaceNameKey(streamName.getNamespaceName());
-        GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
+
+
         return store.get(nsNameKey)
             .thenCompose(nsIdBytes -> {
                 if (null == nsIdBytes) {
-                    return FutureUtils.value(respBuilder.setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
+                    return FutureUtils.value(GetStreamResponse.newBuilder()
+                        .setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
                 }
-
                 long nsId = Bytes.toLong(nsIdBytes, 0);
-                return getStreamProps(nsId, streamName.getStreamName())
-                    .thenCompose(streamProps -> {
-                        if (null == streamProps) {
-                            return FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
-                        } else {
-                            return FutureUtils.value(respBuilder
-                                .setCode(StatusCode.SUCCESS)
-                                .setStreamProps(streamProps)
-                                .build());
-                        }
-                    })
-                    .exceptionally(cause -> respBuilder
-                        .setCode(StatusCode.INTERNAL_SERVER_ERROR)
-                        .build());
+                return streamPropertiesToResponse(
+                    getStreamProps(nsId, streamName.getStreamName()));
             });
     }
 
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index 93a9e4776..da3b6d111 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -461,6 +461,24 @@ public void testGetStreamSuccess() throws Exception {
         assertEquals(streamConf, getResp.getStreamProps().getStreamConf());
     }
 
+    @Test
+    public void testGetStreamByIdSuccess() throws Exception {
+        String nsName = name.getMethodName();
+        String streamName = name.getMethodName();
+
+        createNamespaceAndVerify(nsName, 0L);
+        createStreamAndVerify(nsName, streamName, MIN_DATA_STREAM_ID);
+        verifyStreamId(MIN_DATA_STREAM_ID);
+
+        CompletableFuture<GetStreamResponse> getFuture = rootRangeStore.getStream(
+            createGetStreamRequest(MIN_DATA_STREAM_ID));
+        GetStreamResponse getResp = FutureUtils.result(getFuture);
+        assertEquals(StatusCode.SUCCESS, getResp.getCode());
+        assertEquals(MIN_DATA_STREAM_ID, getResp.getStreamProps().getStreamId());
+        assertEquals(streamName, getResp.getStreamProps().getStreamName());
+        assertEquals(streamConf, getResp.getStreamProps().getStreamConf());
+    }
+
     @Test
     public void testGetStreamNotFound() throws Exception {
         String nsName = name.getMethodName();
@@ -476,4 +494,18 @@ public void testGetStreamNotFound() throws Exception {
         assertEquals(StatusCode.STREAM_NOT_FOUND, response.getCode());
     }
 
+    @Test
+    public void testGetStreamByIdNotFound() throws Exception {
+        String nsName = name.getMethodName();
+
+        createNamespaceAndVerify(nsName, 0L);
+
+        verifyStreamId(-1);
+
+        CompletableFuture<GetStreamResponse> getFuture = rootRangeStore.getStream(
+            createGetStreamRequest(MIN_DATA_STREAM_ID));
+        GetStreamResponse response = FutureUtils.result(getFuture);
+        assertEquals(StatusCode.STREAM_NOT_FOUND, response.getCode());
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services