You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/12/03 06:21:47 UTC

[servicecomb-java-chassis] 01/02: [SCB-968]968 http2 do not support pump download

This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit fde6f4dc8c52360476dcfe0b4845a17abae2a758
Author: heyile <25...@qq.com>
AuthorDate: Thu Nov 22 20:14:59 2018 +0800

    [SCB-968]968 http2 do not support pump download
---
 .../foundation/vertx/stream/PumpFactoryImpl.java   |  40 ++++++++
 .../foundation/vertx/stream/PumpImpl.java          | 108 +++++++++++++++++++++
 .../services/io.vertx.core.spi.PumpFactory         |   1 +
 .../vertx/stream/TestPumpFactoryImpl.java          |  33 +++++++
 .../foundation/vertx/stream/TestPumpImpl.java      |  50 ++++++++++
 .../org/apache/servicecomb/it/ConsumerMain.java    |   5 +-
 .../servicecomb/it/schema/DownloadSchema.java      |   1 -
 7 files changed, 233 insertions(+), 5 deletions(-)

diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java
new file mode 100644
index 0000000..c3284e3
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.util.Objects;
+
+import io.vertx.core.spi.PumpFactory;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpFactoryImpl implements PumpFactory {
+  @Override
+  public <T> Pump pump(ReadStream<T> rs, WriteStream<T> ws) {
+    Objects.requireNonNull(rs);
+    Objects.requireNonNull(ws);
+    return new PumpImpl<>(rs, ws);
+  }
+
+  @Override
+  public <T> Pump pump(ReadStream<T> rs, WriteStream<T> ws, int writeQueueMaxSize) {
+    Objects.requireNonNull(rs);
+    Objects.requireNonNull(ws);
+    return new PumpImpl<>(rs, ws, writeQueueMaxSize);
+  }
+}
\ No newline at end of file
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java
new file mode 100644
index 0000000..3657e73
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpImpl<T> implements Pump {
+
+  private final ReadStream<T> readStream;
+
+  private final WriteStream<T> writeStream;
+
+  private final Handler<T> dataHandler;
+
+  private final Handler<Void> drainHandler;
+
+  private int pumped;
+
+  public PumpImpl(ReadStream<T> readStream, WriteStream<T> writeStream, int maxWriteQueueSize) {
+    this(readStream, writeStream);
+    this.writeStream.setWriteQueueMaxSize(maxWriteQueueSize);
+  }
+
+  public PumpImpl(ReadStream<T> readStream, WriteStream<T> writeStream) {
+    this.readStream = readStream;
+    this.writeStream = writeStream;
+    drainHandler = v -> readStream.resume();
+    dataHandler = data -> {
+      if (data instanceof Buffer) {
+        if (((Buffer) data).length() == 0) {
+          return;
+        }
+      }
+      writeStream.write(data);
+      incPumped();
+      if (writeStream.writeQueueFull()) {
+        readStream.pause();
+        writeStream.drainHandler(drainHandler);
+      }
+    };
+  }
+
+
+  /**
+   * Set the write queue max size to {@code maxSize}
+   */
+  @Override
+  public PumpImpl<T> setWriteQueueMaxSize(int maxSize) {
+    writeStream.setWriteQueueMaxSize(maxSize);
+    return this;
+  }
+
+  /**
+   * Start the Pump. The Pump can be started and stopped multiple times.
+   */
+  @Override
+  public PumpImpl<T> start() {
+    readStream.handler(dataHandler);
+    return this;
+  }
+
+  /**
+   * Stop the Pump. The Pump can be started and stopped multiple times.
+   */
+  @Override
+  public PumpImpl<T> stop() {
+    writeStream.drainHandler(null);
+    readStream.handler(null);
+    return this;
+  }
+
+  /**
+   * Return the total number of elements pumped by this pump.
+   */
+  @Override
+  public synchronized int numberPumped() {
+    return pumped;
+  }
+
+  // Note we synchronize as numberPumped can be called from a different thread however incPumped will always
+  // be called from the same thread so we benefit from bias locked optimisation which should give a very low
+  // overhead
+  private synchronized void incPumped() {
+    pumped++;
+  }
+
+  public Handler<T> getDataHandler() {
+    return dataHandler;
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory
new file mode 100644
index 0000000..1d25eef
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory
@@ -0,0 +1 @@
+org.apache.servicecomb.foundation.vertx.stream.PumpFactoryImpl
\ No newline at end of file
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java
new file mode 100644
index 0000000..f86f46d
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+import mockit.Mocked;
+
+public class TestPumpFactoryImpl {
+  @Test
+  public void pump(@Mocked ReadStream<Object> rs, @Mocked WriteStream<Object> ws) {
+    Pump pump = Pump.pump(rs, ws);
+    Assert.assertTrue(pump instanceof PumpImpl);
+  }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java
new file mode 100644
index 0000000..a2ccb17
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class TestPumpImpl {
+
+  @Test
+  public void testPumpWithPending(@Mocked ReadStream<Object> rs, @Mocked WriteStream<Object> ws, @Mocked Buffer zeroBuf,
+      @Mocked Buffer contentBuf) {
+    PumpImpl<Object> pump = new PumpImpl<>(rs, ws);
+    Handler<Object> handler = pump.getDataHandler();
+    new Expectations() {
+      {
+        zeroBuf.length();
+        result = 0;
+        contentBuf.length();
+        result = 1;
+      }
+    };
+    handler.handle(zeroBuf);
+    handler.handle(contentBuf);
+    Assert.assertEquals(1, pump.numberPumped());
+    handler.handle(contentBuf);
+    Assert.assertEquals(2, pump.numberPumped());
+  }
+}
diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java
index 2209583..0ef5039 100644
--- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java
+++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java
@@ -103,10 +103,7 @@ public class ConsumerMain {
     // only rest support default value feature
     ITJUnitUtils.runWithRest(TestDefaultValue.class);
 
-    // currently have bug with http2
-    if (!ITJUnitUtils.getProducerName().endsWith("-h2") && !ITJUnitUtils.getProducerName().endsWith("-h2c")) {
-      ITJUnitUtils.runWithRest(TestDownload.class);
-    }
+    ITJUnitUtils.runWithRest(TestDownload.class);
 
     ITJUnitUtils.runWithHighwayAndRest(TestTrace.class);
     ITJUnitUtils.run(TestTraceEdge.class);
diff --git a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java
index b1123b6..5ba7d8b 100644
--- a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java
+++ b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java
@@ -197,7 +197,6 @@ public class DownloadSchema implements BootListener {
         .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE)
         .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=netInputStream.txt")
         .body(conn.getInputStream());
-    conn.disconnect();
     return responseEntity;
   }