You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:46 UTC
[2/8] incubator-rocketmq git commit: initialize RocketMQ5
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java
new file mode 100644
index 0000000..219ee6e
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+
+public class StatsItem {
+ private final AtomicLong value = new AtomicLong(0);
+ private final AtomicLong times = new AtomicLong(0);
+ private final AtomicLong[] valueIncDistributeRegion = new AtomicLong[10];
+ private final AtomicLong valueMaxInMinutes = new AtomicLong(0);
+ private final AtomicLong valueMaxIn10Minutes = new AtomicLong(0);
+ private final AtomicLong valueMaxInHour = new AtomicLong(0);
+
+ private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
+ private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
+ private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
+
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final String statsName;
+ private final String statsKey;
+ private final Logger log;
+
+ public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, Logger log) {
+ this.statsName = statsName;
+ this.statsKey = statsKey;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+
+ for (int i = 0; i < this.valueIncDistributeRegion.length; i++) {
+ valueIncDistributeRegion[i] = new AtomicLong(0);
+ }
+ }
+
+ public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {
+ long prev = target.get();
+ while (value > prev) {
+ boolean updated = target.compareAndSet(prev, value);
+ if (updated)
+ return true;
+
+ prev = target.get();
+ }
+
+ return false;
+ }
+
+ private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
+ StatsSnapshot statsSnapshot = new StatsSnapshot();
+ synchronized (csList) {
+ double tps = 0;
+ double avgpt = 0;
+ long sum = 0;
+ if (!csList.isEmpty()) {
+ CallSnapshot first = csList.getFirst();
+ CallSnapshot last = csList.getLast();
+ sum = last.getValue() - first.getValue();
+ tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
+
+ long timesDiff = last.getTimes() - first.getTimes();
+ if (timesDiff > 0) {
+ avgpt = (sum * 1.0d) / timesDiff;
+ }
+
+ }
+
+ statsSnapshot.setSum(sum);
+ statsSnapshot.setTps(tps);
+ statsSnapshot.setAvgpt(avgpt);
+ }
+
+ return statsSnapshot;
+ }
+
+ public static long computNextMinutesTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 0);
+ cal.add(Calendar.MINUTE, 1);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+ public static long computNextHourTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+ public static long computNextMorningTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+ public void addValue(final int incValue, final int incTimes) {
+ this.value.addAndGet(incValue);
+ this.times.addAndGet(incTimes);
+ this.setValueIncDistributeRegion(incValue);
+ StatsItem.compareAndIncreaseOnly(this.valueMaxInMinutes, incValue);
+ StatsItem.compareAndIncreaseOnly(this.valueMaxIn10Minutes, incValue);
+ StatsItem.compareAndIncreaseOnly(this.valueMaxInHour, incValue);
+ }
+
+ public StatsSnapshot getStatsDataInMinute() {
+ return computeStatsData(this.csListMinute);
+ }
+
+ public StatsSnapshot getStatsDataInHour() {
+ return computeStatsData(this.csListHour);
+ }
+
+ public StatsSnapshot getStatsDataInDay() {
+ return computeStatsData(this.csListDay);
+ }
+
+ public void init() {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInSeconds();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInMinutes();
+ } catch (Throwable ignored) {
+ }
+
+ }
+ }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
+ 1000 * 60 * 10, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInHour();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, 0, 1, TimeUnit.HOURS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
+ 1000 * 60, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtHour();
+ } catch (Throwable ignored) {
+ }
+
+ }
+ }, Math.abs(StatsItem.computNextHourTimeMillis() - System.currentTimeMillis()), //
+ 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtDay();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(StatsItem.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, //
+ 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+ }
+
+ public void samplingInSeconds() {
+ synchronized (this.csListMinute) {
+ this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get()));
+ if (this.csListMinute.size() > 7) {
+ this.csListMinute.removeFirst();
+ }
+ }
+ }
+
+ public void samplingInMinutes() {
+ synchronized (this.csListHour) {
+ this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get()));
+ if (this.csListHour.size() > 7) {
+ this.csListHour.removeFirst();
+ }
+ }
+
+ valueMaxIn10Minutes.set(0);
+ }
+
+ public void samplingInHour() {
+ synchronized (this.csListDay) {
+ this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get()));
+ if (this.csListDay.size() > 25) {
+ this.csListDay.removeFirst();
+ }
+ }
+ }
+
+ public void printAtMinutes() {
+ StatsSnapshot ss = computeStatsData(this.csListMinute);
+ log.info(String
+ .format(
+ "[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt(),
+ this.valueMaxInMinutes.get(),
+ this.valueMaxIn10Minutes.get(),
+ this.valueMaxInHour.get(),
+ Arrays.toString(valueRegion())
+ ));
+
+ valueMaxInMinutes.set(0);
+ }
+
+ public void printAtHour() {
+ StatsSnapshot ss = computeStatsData(this.csListHour);
+ log.info(String
+ .format(
+ "[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt(),
+ this.valueMaxInMinutes.get(),
+ this.valueMaxIn10Minutes.get(),
+ this.valueMaxInHour.get(),
+ Arrays.toString(valueRegion())
+ ));
+
+ valueMaxInHour.set(0);
+ }
+
+ public void printAtDay() {
+ StatsSnapshot ss = computeStatsData(this.csListDay);
+ log.info(String.format(
+ "[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt(),
+ this.valueMaxInMinutes.get(),
+ this.valueMaxIn10Minutes.get(),
+ this.valueMaxInHour.get(),
+ Arrays.toString(valueRegion())
+ ));
+ }
+
+ long[] valueRegion() {
+ long[] vrs = new long[this.valueIncDistributeRegion.length];
+ for (int i = 0; i < this.valueIncDistributeRegion.length; i++) {
+ vrs[i] = this.valueIncDistributeRegion[i].get();
+ }
+ return vrs;
+ }
+
+ public AtomicLong getValue() {
+ return value;
+ }
+
+ public String getStatsName() {
+ return statsName;
+ }
+
+ public AtomicLong getTimes() {
+ return times;
+ }
+
+ public AtomicLong[] getValueDistributeRegion() {
+ return valueIncDistributeRegion;
+ }
+
+ public AtomicLong[] getValueIncDistributeRegion() {
+ return valueIncDistributeRegion;
+ }
+
+ private void setValueIncDistributeRegion(long value) {
+ // < 1ms
+ if (value <= 0) {
+ this.valueIncDistributeRegion[0].incrementAndGet();
+ }
+ // 1ms ~ 10ms
+ else if (value < 10) {
+ this.valueIncDistributeRegion[1].incrementAndGet();
+ }
+ // 10ms ~ 100ms
+ else if (value < 100) {
+ this.valueIncDistributeRegion[2].incrementAndGet();
+ }
+ // 100ms ~ 500ms
+ else if (value < 500) {
+ this.valueIncDistributeRegion[3].incrementAndGet();
+ }
+ // 500ms ~ 1s
+ else if (value < 1000) {
+ this.valueIncDistributeRegion[4].incrementAndGet();
+ }
+ // 1s ~ 3s
+ else if (value < 3000) {
+ this.valueIncDistributeRegion[5].incrementAndGet();
+ }
+ // 3s ~ 5s
+ else if (value < 5000) {
+ this.valueIncDistributeRegion[6].incrementAndGet();
+ }
+ // 5s ~ 10s
+ else if (value < 10000) {
+ this.valueIncDistributeRegion[7].incrementAndGet();
+ }
+ // 10s ~ 30s
+ else if (value < 30000) {
+ this.valueIncDistributeRegion[8].incrementAndGet();
+ }
+ // >= 30s
+ else {
+ this.valueIncDistributeRegion[9].incrementAndGet();
+ }
+ }
+
+ public AtomicLong getValueMaxInHour() {
+ return valueMaxInHour;
+ }
+
+ public AtomicLong getValueMaxInMinutes() {
+ return valueMaxInMinutes;
+ }
+
+ public AtomicLong getValueMaxIn10Minutes() {
+ return valueMaxIn10Minutes;
+ }
+
+ public static class StatsSnapshot {
+ private long sum;
+ private double tps;
+ private double avgpt;
+
+ public long getSum() {
+ return sum;
+ }
+
+ public void setSum(long sum) {
+ this.sum = sum;
+ }
+
+ public double getTps() {
+ return tps;
+ }
+
+ public void setTps(double tps) {
+ this.tps = tps;
+ }
+
+ public double getAvgpt() {
+ return avgpt;
+ }
+
+ public void setAvgpt(double avgpt) {
+ this.avgpt = avgpt;
+ }
+ }
+
+ class CallSnapshot {
+ private final long timestamp;
+ private final long times;
+ private final long value;
+
+ public CallSnapshot(long timestamp, long times, long value) {
+ super();
+ this.timestamp = timestamp;
+ this.times = times;
+ this.value = value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public long getTimes() {
+ return times;
+ }
+
+ public long getValue() {
+ return value;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java
new file mode 100644
index 0000000..79c1520
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+
+public class StatsItemSet {
+ private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = new ConcurrentHashMap<String, StatsItem>(128);
+ private final String statsName;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+ public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+ this.statsName = statsName;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ }
+
+ public void init() {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInSeconds();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInMinutes();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, 0, 10, TimeUnit.MINUTES);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInHour();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, 0, 1, TimeUnit.HOURS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
+ 1000 * 60, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtHour();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(StatsItem.computNextHourTimeMillis() - System.currentTimeMillis()), //
+ 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtDay();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(StatsItem.computNextMorningTimeMillis() - System.currentTimeMillis()), //
+ 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+ }
+
+ private void samplingInSeconds() {
+ for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+ next.getValue().samplingInSeconds();
+ }
+ }
+
+ private void samplingInMinutes() {
+ for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+ next.getValue().samplingInMinutes();
+ }
+ }
+
+ private void samplingInHour() {
+ for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+ next.getValue().samplingInHour();
+ }
+ }
+
+ private void printAtMinutes() {
+ for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+ next.getValue().printAtMinutes();
+ }
+ }
+
+ private void printAtHour() {
+ for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+ next.getValue().printAtHour();
+ }
+ }
+
+ private void printAtDay() {
+ for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+ next.getValue().printAtDay();
+ }
+ }
+
+ void addValue(final String statsKey, final int incValue, final int incTimes) {
+ this.getAndCreateStatsItem(statsKey).addValue(incValue, incTimes);
+ }
+
+ private StatsItem getAndCreateStatsItem(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null == statsItem) {
+ statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+ if (null == prev) {
+ // statsItem.init();
+ }
+ }
+
+ return statsItem;
+ }
+
+ public StatsItem.StatsSnapshot getStatsDataInMinute(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInMinute();
+ }
+ return new StatsItem.StatsSnapshot();
+ }
+
+ public StatsItem.StatsSnapshot getStatsDataInHour(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInHour();
+ }
+
+ return new StatsItem.StatsSnapshot();
+ }
+
+ public StatsItem.StatsSnapshot getStatsDataInDay(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInDay();
+ }
+ return new StatsItem.StatsSnapshot();
+ }
+
+ public StatsItem getStatsItem(final String statsKey) {
+ return this.statsItemTable.get(statsKey);
+ }
+
+ ConcurrentHashMap<String, StatsItem> getStatsItemTable() {
+ return statsItemTable;
+ }
+
+ public String getStatsName() {
+ return statsName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java
new file mode 100644
index 0000000..540779e
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ThreadStats {
+ private final ConcurrentHashMap<Threading, TimestampRegion> statsTable = new ConcurrentHashMap<Threading, TimestampRegion>(64);
+
+ public void beginInvoke(final long beginTimestamp) {
+ Threading th = new Threading(Thread.currentThread().getName(), Thread.currentThread().getId());
+
+ TimestampRegion tr = this.statsTable.get(th);
+ if (null == tr) {
+ tr = new TimestampRegion();
+ this.statsTable.put(th, tr);
+ }
+
+ tr.setBeginTimestamp(beginTimestamp);
+ tr.setEndTimestamp(-1);
+ }
+
+ public void endInvoke(final long endTimestamp) {
+ Threading th = new Threading(Thread.currentThread().getName(), Thread.currentThread().getId());
+ TimestampRegion tr = this.statsTable.get(th);
+ tr.setEndTimestamp(endTimestamp);
+ }
+
+ public TreeMap<Threading, TimestampRegion> cloneStatsTable() {
+ TreeMap<Threading, TimestampRegion> result = new TreeMap<Threading, TimestampRegion>();
+
+ for (final Map.Entry<Threading, TimestampRegion> next : this.statsTable.entrySet()) {
+ result.put(next.getKey(), next.getValue());
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java
new file mode 100644
index 0000000..e2c19b5
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+public class Threading implements Comparable {
+ private String name;
+ private long id;
+
+ public Threading() {
+ }
+
+ public Threading(String name, long id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + (int) (id ^ (id >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Threading threading = (Threading) o;
+
+ return id == threading.id && !(name != null ? !name.equals(threading.name) : threading.name != null);
+
+ }
+
+ @Override
+ public String toString() {
+ return "Threading{" +
+ "name='" + name + '\'' +
+ ", id=" + id +
+ '}';
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ Threading t = (Threading) o;
+ int ret = t.name.compareTo(this.name);
+ if (ret == 0) {
+ return Long.valueOf(t.id).compareTo(this.id);
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java
new file mode 100644
index 0000000..cf8042b
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+public class TimestampRegion {
+ private volatile long beginTimestamp = -1;
+ private volatile long endTimestamp = -1;
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ public void setBeginTimestamp(long beginTimestamp) {
+ this.beginTimestamp = beginTimestamp;
+ }
+
+ public long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ public void setEndTimestamp(long endTimestamp) {
+ this.endTimestamp = endTimestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (beginTimestamp ^ (beginTimestamp >>> 32));
+ result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TimestampRegion that = (TimestampRegion) o;
+ return beginTimestamp == that.beginTimestamp && endTimestamp == that.endTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "TimestampRegion{" +
+ "beginTimestamp=" + beginTimestamp +
+ ", endTimestamp=" + endTimestamp +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java
new file mode 100644
index 0000000..376ccda
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Map;
+
+public class UtilAll {
+
+ public static String jstack() {
+ return jstack(Thread.getAllStackTraces());
+ }
+
+ private static String jstack(Map<Thread, StackTraceElement[]> map) {
+ StringBuilder result = new StringBuilder();
+ try {
+ for (final Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
+ StackTraceElement[] elements = entry.getValue();
+ Thread thread = entry.getKey();
+ if (elements != null && elements.length > 0) {
+ String threadName = entry.getKey().getName();
+ result.append(String.format("%-40s TID: %d STATE: %s\n", threadName, thread.getId(), thread.getState()));
+ for (StackTraceElement el : elements) {
+ result.append(String.format("%-40s %s\n", threadName, el.toString()));
+ }
+ result.append("\n");
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+
+ return result.toString();
+ }
+
+ public static String jstack(final String threadName, final long threadId) {
+ Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
+ StringBuilder result = new StringBuilder();
+ try {
+ for (final Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
+ StackTraceElement[] elements = entry.getValue();
+ Thread thread = entry.getKey();
+ if (elements != null && elements.length > 0) {
+ if (threadName.equals(entry.getKey().getName()) && threadId == entry.getKey().getId()) {
+ result.append(String.format("%-40s TID: %d STATE: %s\n", threadName, thread.getId(), thread.getState()));
+ for (StackTraceElement el : elements) {
+ result.append(String.format("%-40s %s\n", threadName, el.toString()));
+ }
+ }
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ return result.toString();
+ }
+
+ public static ExecuteResult callShellCommand(final String shellString) {
+ Process process = null;
+ try {
+ String[] cmdArray = shellString.split(" ");
+ process = Runtime.getRuntime().exec(cmdArray);
+ process.waitFor();
+ } catch (Throwable ignored) {
+ } finally {
+ if (null != process)
+ process.destroy();
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java
new file mode 100644
index 0000000..e7330b8
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.processor;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.rpc.annotation.RemoteService;
+import org.apache.rocketmq.rpc.impl.command.ResponseCode;
+import org.apache.rocketmq.rpc.impl.context.RpcProviderContext;
+import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionInvokeMessage;
+import org.apache.rocketmq.rpc.impl.metrics.ServiceStats;
+import org.apache.rocketmq.rpc.impl.service.RpcEntry;
+import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract;
+import org.apache.rocketmq.rpc.impl.service.RpcServiceCallBody;
+import org.apache.rocketmq.rpc.internal.ServiceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcRequestProcessor implements RequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(RpcRequestProcessor.class);
+ private Map<String, RpcEntry> serviceTable = new ConcurrentHashMap<>(64);
+ private Map<String, ExecutorService> executorTable = new ConcurrentHashMap<>(64);
+ private ThreadLocal<RpcProviderContext> threadLocalProviderContext;
+ private RpcInstanceAbstract rpcInstanceAbstract;
+ private ServiceStats serviceStats;
+
+ public RpcRequestProcessor(ThreadLocal<RpcProviderContext> threadLocalProviderContext,
+ final RpcInstanceAbstract rpcInstanceAbstract, ServiceStats stats) {
+ this.threadLocalProviderContext = threadLocalProviderContext;
+ this.rpcInstanceAbstract = rpcInstanceAbstract;
+ this.serviceStats = stats;
+ }
+
+ public Set<String> putNewService(final Object obj) {
+ return putNewService(obj, null);
+ }
+
+ public Set<String> putNewService(final Object obj, final ExecutorService executorService) {
+ Class<?>[] interfaces = obj.getClass().getInterfaces();
+ for (Class<?> itf : interfaces) {
+ RemoteService serviceExport = itf.getAnnotation(RemoteService.class);
+ if (null == serviceExport) {
+ log.warn("Service:{} is not remark annotation", itf.getName());
+ continue;
+ }
+
+ Method[] methods = itf.getMethods();
+ for (Method method : methods) {
+ if (!ServiceUtil.testServiceExportMethod(method)) {
+ log.error("The method: [{}] not matched RPC standard", method.toGenericString());
+ continue;
+ }
+
+ String requestCode = ServiceUtil.toRequestCode(serviceExport, method);
+ RpcEntry se = new RpcEntry();
+ se.setServiceExport(serviceExport);
+ se.setObject(obj);
+ se.setMethod(method);
+ this.serviceTable.put(requestCode, se);
+ if (executorService != null) {
+ this.executorTable.put(requestCode, executorService);
+ }
+ }
+ }
+ return this.serviceTable.keySet();
+ }
+
+ @Override
+ public RemotingCommand processRequest(RemotingChannel channel, RemotingCommand request) {
+ RpcProviderContext rpcProviderContext = new RpcProviderContext();
+ rpcProviderContext.setRemotingChannel(channel);
+ rpcProviderContext.setRemotingRequest(request);
+ rpcProviderContext.setRemotingResponse(rpcInstanceAbstract.remotingService().commandFactory().createResponse(request));
+ rpcProviderContext.setReturnResponse(true);
+ threadLocalProviderContext.set(rpcProviderContext);
+ this.processRequest0(rpcProviderContext, request);
+ threadLocalProviderContext.remove();
+
+ if (rpcProviderContext.isReturnResponse()) {
+ return rpcProviderContext.getRemotingResponse();
+ }
+ return null;
+ }
+
+ private Object[] buildParameter(final RpcProviderContext context, final RemotingCommand request,
+ RpcServiceCallBody serviceCallBody, Type[] parameterTypes) {
+ Object[] parameters = new Object[parameterTypes.length];
+ try {
+ int index = 0;
+ Serializer serialization = this.rpcInstanceAbstract.remotingService()
+ .serializerFactory().get(request.serializerType());
+ for (Type parameterType : parameterTypes) {
+ parameters[index] = serialization.decode(serviceCallBody.getParameter(index), parameterType);
+ index++;
+ }
+ } catch (Exception e) {
+ ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = new ServiceExceptionInvokeMessage();
+ serviceExceptionInvokeMessage.setClassFullName(e.getClass().getName());
+ serviceExceptionInvokeMessage.setErrorMessage(serializeException(request.serializerType(), e));
+ context.getRemotingResponse().opCode(ResponseCode.PARAMETER_ERROR);
+ context.getRemotingResponse().parameter(serviceExceptionInvokeMessage);
+ }
+ return parameters;
+ }
+
+ private String serializeException(byte serializeType, Exception exception) {
+ Serializer serialization = rpcInstanceAbstract.remotingService().serializerFactory().get(serializeType);
+ return serialization.encode(exception).toString();
+ }
+
+ private void dealWithException(final RpcProviderContext context, RemotingCommand request, Exception exception) {
+ if (exception instanceof InvocationTargetException) {
+ context.getRemotingResponse().opCode(ResponseCode.USER_SERVICE_EXCEPTION);
+ } else if (exception instanceof IllegalArgumentException) {
+ context.getRemotingResponse().opCode(ResponseCode.PARAMETER_ERROR);
+ } else if (exception instanceof IllegalAccessException) {
+ context.getRemotingResponse().opCode(ResponseCode.ILLEGAL_ACCESS);
+ } else if (exception instanceof NullPointerException) {
+ context.getRemotingResponse().opCode(ResponseCode.NULL_POINTER);
+ } else {
+ context.getRemotingResponse().opCode(ResponseCode.USER_SERVICE_EXCEPTION);
+ }
+ String remarkMsg, exceptionMsg, exceptionClassname;
+ ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = new ServiceExceptionInvokeMessage();
+ if (exception instanceof InvocationTargetException) {
+ exceptionMsg = ((InvocationTargetException) exception).getTargetException().getMessage();
+ remarkMsg = exceptionMsg == null ? "" : exceptionMsg;
+ exceptionClassname = ((InvocationTargetException) exception).getTargetException().getClass().getName();
+ } else {
+ exceptionMsg = exception.getMessage();
+ remarkMsg = exceptionMsg == null ? "" : exceptionMsg;
+ exceptionClassname = exception.getClass().getName();
+ }
+ serviceExceptionInvokeMessage.setClassFullName(exceptionClassname);
+ serviceExceptionInvokeMessage.setErrorMessage(remarkMsg);
+ if (exception.getCause() != null)
+ serviceExceptionInvokeMessage.setThrowable(exception.getCause());
+ Object[] args = new Object[] {serviceExceptionInvokeMessage};
+ Serializer serializer = ServiceUtil.selectSerializer(args, request.serializerType());
+ if (serializer != null)
+ context.getRemotingResponse().serializerType(serializer.type());
+ context.getRemotingResponse().parameter(serviceExceptionInvokeMessage);
+ }
+
+ private void processRequest0(final RpcProviderContext context, final RemotingCommand request) {
+ final RpcServiceCallBody serviceCallBody =
+ request.parameter(this.rpcInstanceAbstract.remotingService().serializerFactory(), RpcServiceCallBody.class);
+ final RpcEntry entry = this.serviceTable.get(serviceCallBody.getServiceId());
+ if (entry != null) {
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ Type[] parameterTypes = entry.getMethod().getGenericParameterTypes();
+ Object result = null;
+ Exception exception = null;
+ long startTime = System.currentTimeMillis();
+ try {
+ if (parameterTypes.length == 0) {
+ result = entry.getMethod().invoke(entry.getObject());
+ } else {
+ Object[] parameters = buildParameter(context, request, serviceCallBody, parameterTypes);
+ result = entry.getMethod().invoke(entry.getObject(), parameters);
+ }
+ serviceStats.addProviderOKQPSValue(serviceCallBody.getServiceId(), 1, 1);
+ serviceStats.addProviderRTValue(serviceCallBody.getServiceId(), (int) (System.currentTimeMillis() - startTime), 1);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null)
+ dealWithException(context, request, exception);
+ else if (!entry.getMethod().getReturnType().equals(Void.class)) {
+ Object[] args = new Object[] {result};
+ Serializer serializer = ServiceUtil.selectSerializer(args, request.serializerType());
+ if (serializer != null)
+ context.getRemotingResponse().serializerType(serializer.type());
+ context.getRemotingResponse().parameter(result);
+ }
+ }
+ };
+ ExecutorService executorService = this.executorTable.get(serviceCallBody.getServiceId());
+ if (executorService != null) {
+ executorService.submit(runnable);
+ } else {
+ runnable.run();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java
new file mode 100644
index 0000000..0be1014
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.promise;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.rpc.api.Promise;
+import org.apache.rocketmq.rpc.api.PromiseListener;
+import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager;
+import org.apache.rocketmq.rpc.internal.RpcErrorMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultPromise<V> implements Promise<V> {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
+ private final Object lock = new Object();
+ private volatile FutureState state = FutureState.DOING;
+ private V result = null;
+ private long timeout;
+ private long createTime;
+ private Throwable exception = null;
+ private List<PromiseListener<V>> promiseListenerList;
+
+ public DefaultPromise() {
+ createTime = System.currentTimeMillis();
+ promiseListenerList = new ArrayList<>();
+ timeout = 5000;
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state.isCancelledState();
+ }
+
+ @Override
+ public boolean isDone() {
+ return state.isDoneState();
+ }
+
+ @Override
+ public V get() {
+ return result;
+ }
+
+ @Override
+ public V get(final long timeout) {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return getValueOrThrowable();
+ }
+
+ if (timeout <= 0) {
+ try {
+ lock.wait();
+ } catch (Exception e) {
+ cancel(e);
+ }
+ return getValueOrThrowable();
+ } else {
+ long waitTime = timeout - (System.currentTimeMillis() - createTime);
+ if (waitTime > 0) {
+ for (; ; ) {
+ try {
+ lock.wait(waitTime);
+ } catch (InterruptedException e) {
+ LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+ }
+
+ if (!isDoing()) {
+ break;
+ } else {
+ waitTime = timeout - (System.currentTimeMillis() - createTime);
+ if (waitTime <= 0) {
+ break;
+ }
+ }
+ }
+ }
+
+ if (isDoing()) {
+ timeoutSoCancel();
+ }
+ }
+ return getValueOrThrowable();
+ }
+ }
+
+ @Override
+ public boolean set(final V value) {
+ if (value == null)
+ return false;
+ this.result = value;
+ return done();
+ }
+
+ @Override
+ public boolean setFailure(final Throwable cause) {
+ if (cause == null)
+ return false;
+ this.exception = cause;
+ return done();
+ }
+
+ @Override
+ public void addListener(final PromiseListener<V> listener) {
+ if (listener == null) {
+ throw new NullPointerException("FutureListener is null");
+ }
+
+ boolean notifyNow = false;
+ synchronized (lock) {
+ if (!isDoing()) {
+ notifyNow = true;
+ } else {
+ if (promiseListenerList == null) {
+ promiseListenerList = new ArrayList<>();
+ }
+ promiseListenerList.add(listener);
+ }
+ }
+
+ if (notifyNow) {
+ notifyListener(listener);
+ }
+ }
+
+ @Override
+ public Throwable getThrowable() {
+ return exception;
+ }
+
+ private void notifyListeners() {
+ if (promiseListenerList != null) {
+ for (PromiseListener<V> listener : promiseListenerList) {
+ notifyListener(listener);
+ }
+ }
+ }
+
+ private boolean isSuccess() {
+ return isDone() && (exception == null);
+ }
+
+ private void timeoutSoCancel() {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return;
+ }
+ state = FutureState.CANCELLED;
+ exception = new RuntimeException("Get request result is timeout or interrupted");
+ lock.notifyAll();
+ }
+ notifyListeners();
+ }
+
+ private V getValueOrThrowable() {
+ if (exception != null) {
+ Throwable e = exception.getCause() != null ? exception.getCause() : exception;
+ throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e);
+ }
+ notifyListeners();
+ return result;
+ }
+
+ private boolean isDoing() {
+ return state.isDoingState();
+ }
+
+ private boolean done() {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return false;
+ }
+
+ state = FutureState.DONE;
+ lock.notifyAll();
+ }
+
+ notifyListeners();
+ return true;
+ }
+
+ private void notifyListener(final PromiseListener<V> listener) {
+ try {
+ if (exception != null)
+ listener.operationFailed(this);
+ else
+ listener.operationCompleted(this);
+ } catch (Throwable t) {
+ LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
+ }
+ }
+
+ private boolean cancel(Exception e) {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return false;
+ }
+
+ state = FutureState.CANCELLED;
+ exception = e;
+ lock.notifyAll();
+ }
+
+ notifyListeners();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java
new file mode 100644
index 0000000..aebc5e3
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.promise;
+
+public enum FutureState {
+ /**
+ * the task is doing
+ **/
+ DOING(0),
+ /**
+ * the task is done
+ **/
+ DONE(1),
+ /**
+ * ths task is cancelled
+ **/
+ CANCELLED(2);
+
+ public final int value;
+
+ private FutureState(int value) {
+ this.value = value;
+ }
+
+ public boolean isCancelledState() {
+ return this == CANCELLED;
+ }
+
+ public boolean isDoneState() {
+ return this == DONE;
+ }
+
+ public boolean isDoingState() {
+ return this == DOING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java
new file mode 100644
index 0000000..ba0180c
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.rpc.impl.server;
+
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.annotation.MethodType;
+import org.apache.rocketmq.rpc.api.AdvancedServer;
+import org.apache.rocketmq.rpc.api.Promise;
+import org.apache.rocketmq.rpc.impl.service.RpcJdkProxy;
+import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory;
+
+public class AdvancedServerImpl implements AdvancedServer {
+ private final SimpleServerImpl simpleServer;
+
+ public AdvancedServerImpl(final SimpleServerImpl simpleServer) {
+ this.simpleServer = simpleServer;
+ }
+
+ @Override
+ public <T> T callSync(final RemotingChannel channel, final String serviceCode, final String version,
+ final Object[] parameter, final Class<T> responseType) throws Exception {
+ RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter);
+ RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel);
+ return (T) simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.SYNC);
+ }
+
+ @Override
+ public <T> Promise<T> callAsync(final RemotingChannel channel, final String serviceCode, final String version,
+ final Object[] parameter, final Class<T> responseType) throws Exception {
+ RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter);
+ RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel);
+ return (Promise<T>) simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.ASYNC);
+ }
+
+ @Override
+ public void callOneway(final RemotingChannel channel, final String serviceCode, final String version,
+ final Object[] parameter) throws Exception {
+ RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter);
+ RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel);
+ simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, Void.TYPE, MethodType.ASYNC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
new file mode 100644
index 0000000..e076cbe
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.server;
+
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.RemotingService;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.netty.RemotingBootstrapFactory;
+import org.apache.rocketmq.rpc.api.AdvancedServer;
+import org.apache.rocketmq.rpc.api.SimpleServer;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract;
+import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory;
+
+public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServer {
+ private RemotingServer remotingServer;
+ private ExecutorService callServiceThreadPool;
+ private RpcCommonConfig rpcCommonConfig;
+
+ public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
+ this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
+ this.rpcCommonConfig = remotingConfig;
+ this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
+ TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
+ "serverCallServiceThread", true);
+ }
+
+ public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) {
+ super(remotingConfig);
+ this.remotingServer = remotingServer;
+ }
+
+ @Override
+ public RemotingService remotingService() {
+ return this.remotingServer;
+ }
+
+ @Override
+ public void registerServiceListener() {
+
+ }
+
+ @Override
+ public <T> T bind(final Class<T> service, final RemotingChannel channel, final Properties properties) {
+ return this.narrow0(service, RpcProxyFactory.createServiceProxy(service, this, remotingServer, rpcCommonConfig, channel));
+ }
+
+ @Override
+ public AdvancedServer advancedServer() {
+ return new AdvancedServerImpl(this);
+ }
+
+ @Override
+ public void publish(final Object service) {
+ this.publishService0(service);
+ }
+
+ @Override
+ public void publish(final Object service, final ExecutorService executorService) {
+ this.publishService0(service, executorService);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ this.remotingServer.start();
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ ThreadUtils.shutdownGracefully(this.callServiceThreadPool, 3000, TimeUnit.MILLISECONDS);
+ this.remotingServer.stop();
+ }
+
+ public RemotingServer getRemotingServer() {
+ return remotingServer;
+ }
+
+ public void setRemotingServer(final RemotingServer remotingServer) {
+ this.remotingServer = remotingServer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java
new file mode 100644
index 0000000..c0a0e8c
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+
+public class RpcConnectionListener implements ChannelEventListener {
+ private RpcInstanceAbstract rpcInstanceAbstract;
+
+ public RpcConnectionListener(RpcInstanceAbstract rpcInstanceAbstract) {
+ this.rpcInstanceAbstract = rpcInstanceAbstract;
+ }
+
+ @Override
+ public void onChannelConnect(final RemotingChannel remotingChannel) {
+ this.rpcInstanceAbstract.registerServiceListener();
+ }
+
+ @Override
+ public void onChannelClose(final RemotingChannel remotingChannel) {
+
+ }
+
+ @Override
+ public void onChannelException(final RemotingChannel remotingChannel) {
+
+ }
+
+ @Override
+ public void onChannelIdle(final RemotingChannel remotingChannel) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java
new file mode 100644
index 0000000..9bfd53a
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.rpc.annotation.RemoteService;
+
+public class RpcEntry {
+ private RemoteService serviceExport;
+ private Method method;
+ private Object object;
+
+ public Method getMethod() {
+ return method;
+ }
+
+ public void setMethod(Method method) {
+ this.method = method;
+ }
+
+ public Object getObject() {
+ return object;
+ }
+
+ public void setObject(Object object) {
+ this.object = object;
+ }
+
+ public RemoteService getServiceExport() {
+ return serviceExport;
+ }
+
+ public void setServiceExport(RemoteService serviceExport) {
+ this.serviceExport = serviceExport;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
new file mode 100644
index 0000000..2b1288c
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.rpc.impl.command.RpcRequestCode;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+import org.apache.rocketmq.rpc.impl.context.RpcProviderContext;
+import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl;
+import org.apache.rocketmq.rpc.impl.metrics.ThreadStats;
+import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor;
+
+import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory;
+
+public abstract class RpcInstanceAbstract extends RpcProxyCommon {
+ protected final RpcRequestProcessor rpcRequestProcessor;
+ protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>();
+ protected final RpcCommonConfig rpcCommonConfig;
+ protected ThreadStats threadStats;
+ private DefaultServiceAPIImpl defaultServiceAPI;
+ private ThreadPoolExecutor invokeServiceThreadPool;
+
+ public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) {
+ super(rpcCommonConfig);
+ this.threadStats = new ThreadStats();
+ this.rpcCommonConfig = rpcCommonConfig;
+ this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
+
+ this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true));
+
+ }
+
+ public void start() {
+ this.defaultServiceAPI = new DefaultServiceAPIImpl(this.serviceStats, threadStats);
+ this.serviceStats.start();
+ this.publishService0(this.defaultServiceAPI);
+ this.remotingService().registerRequestProcessor(RpcRequestCode.CALL_SERVICE, this.rpcRequestProcessor, this.invokeServiceThreadPool);
+ }
+
+ public void stop() {
+ this.serviceStats.stop();
+ ThreadUtils.shutdownGracefully(this.invokeServiceThreadPool, 3000, TimeUnit.MILLISECONDS);
+ }
+
+ protected void publishService0(Object service) {
+ this.rpcRequestProcessor.putNewService(service);
+ }
+
+ protected void publishService0(Object service, ExecutorService executorService) {
+ this.rpcRequestProcessor.putNewService(service, executorService);
+ }
+
+ protected <T> T narrow0(Class<T> service, RpcJdkProxy rpcJdkProxy) {
+ return rpcJdkProxy.newInstance(service);
+ }
+
+ public abstract void registerServiceListener();
+
+ public ThreadPoolExecutor getInvokeServiceThreadPool() {
+ return invokeServiceThreadPool;
+ }
+
+ public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) {
+ this.invokeServiceThreadPool = invokeServiceThreadPool;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java
new file mode 100644
index 0000000..351c8b4
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager;
+import org.apache.rocketmq.rpc.internal.RpcErrorMapper;
+
+public abstract class RpcJdkProxy implements InvocationHandler {
+ private final Class<?> service;
+ private final RpcProxyCommon rpcProxyCommon;
+
+ public RpcJdkProxy(final Class<?> service, final RpcProxyCommon rpcProxyCommon) {
+ this.service = service;
+ this.rpcProxyCommon = rpcProxyCommon;
+ }
+
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+ return rpcProxyCommon.invoke0(proxy, this, service, method, args);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T newInstance(Class<T> service) {
+ try {
+ return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class[] {service}, this);
+ } catch (Exception e) {
+ throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e);
+ }
+ }
+
+ public abstract void invokeOneWay(final RemotingCommand request);
+
+ public abstract void invokeAsync(final RemotingCommand request, final AsyncHandler handler);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java
new file mode 100644
index 0000000..4cff74f
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+
+public class RpcJdkProxyClient extends RpcJdkProxy {
+ private RemotingClient remotingClient;
+ private String remotingAddress;
+ private RpcCommonConfig rpcCommonConfig;
+
+ public RpcJdkProxyClient(final Class<?> service,
+ final RpcProxyCommon rpcProxyCommon,
+ final RemotingClient remotingClient,
+ final RpcCommonConfig rpcCommonConfig,
+ final String remotingAddress) {
+ super(service, rpcProxyCommon);
+ this.remotingClient = remotingClient;
+ this.rpcCommonConfig = rpcCommonConfig;
+ this.remotingAddress = remotingAddress;
+ }
+
+ @Override
+ public void invokeOneWay(final RemotingCommand request) {
+ this.remotingClient.invokeOneWay(remotingAddress, request, rpcCommonConfig.getServiceInvokeTimeout());
+ }
+
+ @Override
+ public void invokeAsync(final RemotingCommand request, final AsyncHandler handler) {
+ this.remotingClient.invokeAsync(remotingAddress, request, handler, rpcCommonConfig.getServiceInvokeTimeout());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java
new file mode 100644
index 0000000..c67c8cc
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+
+public class RpcJdkProxyServer extends RpcJdkProxy {
+ private final RemotingServer remotingServer;
+ private final RemotingChannel remotingChannel;
+ private final RpcCommonConfig rpcCommonConfig;
+
+ public RpcJdkProxyServer(final Class<?> service,
+ final RpcProxyCommon rpcProxyCommon,
+ final RemotingServer remotingServer,
+ final RpcCommonConfig rpcCommonConfig,
+ final RemotingChannel remotingChannel) {
+ super(service, rpcProxyCommon);
+ this.remotingServer = remotingServer;
+ this.remotingChannel = remotingChannel;
+ this.rpcCommonConfig = rpcCommonConfig;
+ }
+
+ @Override
+ public void invokeAsync(final RemotingCommand request, final AsyncHandler handler) {
+ this.remotingServer.invokeAsync(remotingChannel, request, handler, rpcCommonConfig.getServiceInvokeTimeout());
+ }
+
+ @Override
+ public void invokeOneWay(final RemotingCommand request) {
+ this.remotingServer.invokeOneWay(remotingChannel, request, rpcCommonConfig.getServiceInvokeTimeout());
+ }
+}