You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:46 UTC

[2/8] incubator-rocketmq git commit: initialize RocketMQ5

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java
new file mode 100644
index 0000000..219ee6e
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItem.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+
+public class StatsItem {
+    private final AtomicLong value = new AtomicLong(0);
+    private final AtomicLong times = new AtomicLong(0);
+    private final AtomicLong[] valueIncDistributeRegion = new AtomicLong[10];
+    private final AtomicLong valueMaxInMinutes = new AtomicLong(0);
+    private final AtomicLong valueMaxIn10Minutes = new AtomicLong(0);
+    private final AtomicLong valueMaxInHour = new AtomicLong(0);
+
+    private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
+    private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
+    private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
+
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final String statsName;
+    private final String statsKey;
+    private final Logger log;
+
+    public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.statsKey = statsKey;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+
+        for (int i = 0; i < this.valueIncDistributeRegion.length; i++) {
+            valueIncDistributeRegion[i] = new AtomicLong(0);
+        }
+    }
+
+    public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {
+        long prev = target.get();
+        while (value > prev) {
+            boolean updated = target.compareAndSet(prev, value);
+            if (updated)
+                return true;
+
+            prev = target.get();
+        }
+
+        return false;
+    }
+
+    private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
+        StatsSnapshot statsSnapshot = new StatsSnapshot();
+        synchronized (csList) {
+            double tps = 0;
+            double avgpt = 0;
+            long sum = 0;
+            if (!csList.isEmpty()) {
+                CallSnapshot first = csList.getFirst();
+                CallSnapshot last = csList.getLast();
+                sum = last.getValue() - first.getValue();
+                tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
+
+                long timesDiff = last.getTimes() - first.getTimes();
+                if (timesDiff > 0) {
+                    avgpt = (sum * 1.0d) / timesDiff;
+                }
+
+            }
+
+            statsSnapshot.setSum(sum);
+            statsSnapshot.setTps(tps);
+            statsSnapshot.setAvgpt(avgpt);
+        }
+
+        return statsSnapshot;
+    }
+
+    public static long computNextMinutesTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 0);
+        cal.add(Calendar.HOUR_OF_DAY, 0);
+        cal.add(Calendar.MINUTE, 1);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+    public static long computNextHourTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 0);
+        cal.add(Calendar.HOUR_OF_DAY, 1);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+    public static long computNextMorningTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 1);
+        cal.set(Calendar.HOUR_OF_DAY, 0);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+    public void addValue(final int incValue, final int incTimes) {
+        this.value.addAndGet(incValue);
+        this.times.addAndGet(incTimes);
+        this.setValueIncDistributeRegion(incValue);
+        StatsItem.compareAndIncreaseOnly(this.valueMaxInMinutes, incValue);
+        StatsItem.compareAndIncreaseOnly(this.valueMaxIn10Minutes, incValue);
+        StatsItem.compareAndIncreaseOnly(this.valueMaxInHour, incValue);
+    }
+
+    public StatsSnapshot getStatsDataInMinute() {
+        return computeStatsData(this.csListMinute);
+    }
+
+    public StatsSnapshot getStatsDataInHour() {
+        return computeStatsData(this.csListHour);
+    }
+
+    public StatsSnapshot getStatsDataInDay() {
+        return computeStatsData(this.csListDay);
+    }
+
+    public void init() {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInSeconds();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      samplingInMinutes();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
+            1000 * 60 * 10, TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInHour();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, 0, 1, TimeUnit.HOURS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      printAtMinutes();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
+            1000 * 60, TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      printAtHour();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextHourTimeMillis() - System.currentTimeMillis()), //
+            1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      printAtDay();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, //
+            1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+    }
+
+    public void samplingInSeconds() {
+        synchronized (this.csListMinute) {
+            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get()));
+            if (this.csListMinute.size() > 7) {
+                this.csListMinute.removeFirst();
+            }
+        }
+    }
+
+    public void samplingInMinutes() {
+        synchronized (this.csListHour) {
+            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get()));
+            if (this.csListHour.size() > 7) {
+                this.csListHour.removeFirst();
+            }
+        }
+
+        valueMaxIn10Minutes.set(0);
+    }
+
+    public void samplingInHour() {
+        synchronized (this.csListDay) {
+            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value.get()));
+            if (this.csListDay.size() > 25) {
+                this.csListDay.removeFirst();
+            }
+        }
+    }
+
+    public void printAtMinutes() {
+        StatsSnapshot ss = computeStatsData(this.csListMinute);
+        log.info(String
+            .format(
+                "[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt(),
+                this.valueMaxInMinutes.get(),
+                this.valueMaxIn10Minutes.get(),
+                this.valueMaxInHour.get(),
+                Arrays.toString(valueRegion())
+            ));
+
+        valueMaxInMinutes.set(0);
+    }
+
+    public void printAtHour() {
+        StatsSnapshot ss = computeStatsData(this.csListHour);
+        log.info(String
+            .format(
+                "[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt(),
+                this.valueMaxInMinutes.get(),
+                this.valueMaxIn10Minutes.get(),
+                this.valueMaxInHour.get(),
+                Arrays.toString(valueRegion())
+            ));
+
+        valueMaxInHour.set(0);
+    }
+
+    public void printAtDay() {
+        StatsSnapshot ss = computeStatsData(this.csListDay);
+        log.info(String.format(
+            "[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.4f valueMaxInMinutes: %d valueMaxIn10Minutes: %d valueMaxInHour: %d valueIncDistributeRegion: %s",
+            this.statsName,
+            this.statsKey,
+            ss.getSum(),
+            ss.getTps(),
+            ss.getAvgpt(),
+            this.valueMaxInMinutes.get(),
+            this.valueMaxIn10Minutes.get(),
+            this.valueMaxInHour.get(),
+            Arrays.toString(valueRegion())
+        ));
+    }
+
+    long[] valueRegion() {
+        long[] vrs = new long[this.valueIncDistributeRegion.length];
+        for (int i = 0; i < this.valueIncDistributeRegion.length; i++) {
+            vrs[i] = this.valueIncDistributeRegion[i].get();
+        }
+        return vrs;
+    }
+
+    public AtomicLong getValue() {
+        return value;
+    }
+
+    public String getStatsName() {
+        return statsName;
+    }
+
+    public AtomicLong getTimes() {
+        return times;
+    }
+
+    public AtomicLong[] getValueDistributeRegion() {
+        return valueIncDistributeRegion;
+    }
+
+    public AtomicLong[] getValueIncDistributeRegion() {
+        return valueIncDistributeRegion;
+    }
+
+    private void setValueIncDistributeRegion(long value) {
+        // < 1ms
+        if (value <= 0) {
+            this.valueIncDistributeRegion[0].incrementAndGet();
+        }
+        // 1ms ~ 10ms
+        else if (value < 10) {
+            this.valueIncDistributeRegion[1].incrementAndGet();
+        }
+        // 10ms ~ 100ms
+        else if (value < 100) {
+            this.valueIncDistributeRegion[2].incrementAndGet();
+        }
+        // 100ms ~ 500ms
+        else if (value < 500) {
+            this.valueIncDistributeRegion[3].incrementAndGet();
+        }
+        // 500ms ~ 1s
+        else if (value < 1000) {
+            this.valueIncDistributeRegion[4].incrementAndGet();
+        }
+        // 1s ~ 3s
+        else if (value < 3000) {
+            this.valueIncDistributeRegion[5].incrementAndGet();
+        }
+        // 3s ~ 5s
+        else if (value < 5000) {
+            this.valueIncDistributeRegion[6].incrementAndGet();
+        }
+        // 5s ~ 10s
+        else if (value < 10000) {
+            this.valueIncDistributeRegion[7].incrementAndGet();
+        }
+        // 10s ~ 30s
+        else if (value < 30000) {
+            this.valueIncDistributeRegion[8].incrementAndGet();
+        }
+        // >= 30s
+        else {
+            this.valueIncDistributeRegion[9].incrementAndGet();
+        }
+    }
+
+    public AtomicLong getValueMaxInHour() {
+        return valueMaxInHour;
+    }
+
+    public AtomicLong getValueMaxInMinutes() {
+        return valueMaxInMinutes;
+    }
+
+    public AtomicLong getValueMaxIn10Minutes() {
+        return valueMaxIn10Minutes;
+    }
+
+    public static class StatsSnapshot {
+        private long sum;
+        private double tps;
+        private double avgpt;
+
+        public long getSum() {
+            return sum;
+        }
+
+        public void setSum(long sum) {
+            this.sum = sum;
+        }
+
+        public double getTps() {
+            return tps;
+        }
+
+        public void setTps(double tps) {
+            this.tps = tps;
+        }
+
+        public double getAvgpt() {
+            return avgpt;
+        }
+
+        public void setAvgpt(double avgpt) {
+            this.avgpt = avgpt;
+        }
+    }
+
+    class CallSnapshot {
+        private final long timestamp;
+        private final long times;
+        private final long value;
+
+        public CallSnapshot(long timestamp, long times, long value) {
+            super();
+            this.timestamp = timestamp;
+            this.times = times;
+            this.value = value;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public long getTimes() {
+            return times;
+        }
+
+        public long getValue() {
+            return value;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java
new file mode 100644
index 0000000..79c1520
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsItemSet.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+
+public class StatsItemSet {
+    private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = new ConcurrentHashMap<String, StatsItem>(128);
+    private final String statsName;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+    public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+    }
+
+    public void init() {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInSeconds();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInMinutes();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, 0, 10, TimeUnit.MINUTES);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInHour();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, 0, 1, TimeUnit.HOURS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      printAtMinutes();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextMinutesTimeMillis() - System.currentTimeMillis()), //
+            1000 * 60, TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      printAtHour();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextHourTimeMillis() - System.currentTimeMillis()), //
+            1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                                                              @Override
+                                                              public void run() {
+                                                                  try {
+                                                                      printAtDay();
+                                                                  } catch (Throwable ignored) {
+                                                                  }
+                                                              }
+                                                          }, Math.abs(StatsItem.computNextMorningTimeMillis() - System.currentTimeMillis()), //
+            1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+    }
+
+    private void samplingInSeconds() {
+        for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+            next.getValue().samplingInSeconds();
+        }
+    }
+
+    private void samplingInMinutes() {
+        for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+            next.getValue().samplingInMinutes();
+        }
+    }
+
+    private void samplingInHour() {
+        for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+            next.getValue().samplingInHour();
+        }
+    }
+
+    private void printAtMinutes() {
+        for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+            next.getValue().printAtMinutes();
+        }
+    }
+
+    private void printAtHour() {
+        for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+            next.getValue().printAtHour();
+        }
+    }
+
+    private void printAtDay() {
+        for (final Map.Entry<String, StatsItem> next : this.statsItemTable.entrySet()) {
+            next.getValue().printAtDay();
+        }
+    }
+
+    void addValue(final String statsKey, final int incValue, final int incTimes) {
+        this.getAndCreateStatsItem(statsKey).addValue(incValue, incTimes);
+    }
+
+    private StatsItem getAndCreateStatsItem(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null == statsItem) {
+            statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+            StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+            if (null == prev) {
+                // statsItem.init();
+            }
+        }
+
+        return statsItem;
+    }
+
+    public StatsItem.StatsSnapshot getStatsDataInMinute(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInMinute();
+        }
+        return new StatsItem.StatsSnapshot();
+    }
+
+    public StatsItem.StatsSnapshot getStatsDataInHour(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInHour();
+        }
+
+        return new StatsItem.StatsSnapshot();
+    }
+
+    public StatsItem.StatsSnapshot getStatsDataInDay(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInDay();
+        }
+        return new StatsItem.StatsSnapshot();
+    }
+
+    public StatsItem getStatsItem(final String statsKey) {
+        return this.statsItemTable.get(statsKey);
+    }
+
+    ConcurrentHashMap<String, StatsItem> getStatsItemTable() {
+        return statsItemTable;
+    }
+
+    public String getStatsName() {
+        return statsName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java
new file mode 100644
index 0000000..540779e
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ThreadStats.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ThreadStats {
+    private final ConcurrentHashMap<Threading, TimestampRegion> statsTable = new ConcurrentHashMap<Threading, TimestampRegion>(64);
+
+    public void beginInvoke(final long beginTimestamp) {
+        Threading th = new Threading(Thread.currentThread().getName(), Thread.currentThread().getId());
+
+        TimestampRegion tr = this.statsTable.get(th);
+        if (null == tr) {
+            tr = new TimestampRegion();
+            this.statsTable.put(th, tr);
+        }
+
+        tr.setBeginTimestamp(beginTimestamp);
+        tr.setEndTimestamp(-1);
+    }
+
+    public void endInvoke(final long endTimestamp) {
+        Threading th = new Threading(Thread.currentThread().getName(), Thread.currentThread().getId());
+        TimestampRegion tr = this.statsTable.get(th);
+        tr.setEndTimestamp(endTimestamp);
+    }
+
+    public TreeMap<Threading, TimestampRegion> cloneStatsTable() {
+        TreeMap<Threading, TimestampRegion> result = new TreeMap<Threading, TimestampRegion>();
+
+        for (final Map.Entry<Threading, TimestampRegion> next : this.statsTable.entrySet()) {
+            result.put(next.getKey(), next.getValue());
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java
new file mode 100644
index 0000000..e2c19b5
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/Threading.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+public class Threading implements Comparable {
+    private String name;
+    private long id;
+
+    public Threading() {
+    }
+
+    public Threading(String name, long id) {
+        this.name = name;
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (int) (id ^ (id >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        Threading threading = (Threading) o;
+
+        return id == threading.id && !(name != null ? !name.equals(threading.name) : threading.name != null);
+
+    }
+
+    @Override
+    public String toString() {
+        return "Threading{" +
+            "name='" + name + '\'' +
+            ", id=" + id +
+            '}';
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        Threading t = (Threading) o;
+        int ret = t.name.compareTo(this.name);
+        if (ret == 0) {
+            return Long.valueOf(t.id).compareTo(this.id);
+        }
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java
new file mode 100644
index 0000000..cf8042b
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/TimestampRegion.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+public class TimestampRegion {
+    private volatile long beginTimestamp = -1;
+    private volatile long endTimestamp = -1;
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+    public void setBeginTimestamp(long beginTimestamp) {
+        this.beginTimestamp = beginTimestamp;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    public void setEndTimestamp(long endTimestamp) {
+        this.endTimestamp = endTimestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (beginTimestamp ^ (beginTimestamp >>> 32));
+        result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TimestampRegion that = (TimestampRegion) o;
+        return beginTimestamp == that.beginTimestamp && endTimestamp == that.endTimestamp;
+    }
+
+    @Override
+    public String toString() {
+        return "TimestampRegion{" +
+            "beginTimestamp=" + beginTimestamp +
+            ", endTimestamp=" + endTimestamp +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java
new file mode 100644
index 0000000..376ccda
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/UtilAll.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.metrics;
+
+import java.util.Map;
+
+public class UtilAll {
+
+    public static String jstack() {
+        return jstack(Thread.getAllStackTraces());
+    }
+
+    private static String jstack(Map<Thread, StackTraceElement[]> map) {
+        StringBuilder result = new StringBuilder();
+        try {
+            for (final Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
+                StackTraceElement[] elements = entry.getValue();
+                Thread thread = entry.getKey();
+                if (elements != null && elements.length > 0) {
+                    String threadName = entry.getKey().getName();
+                    result.append(String.format("%-40s TID: %d STATE: %s\n", threadName, thread.getId(), thread.getState()));
+                    for (StackTraceElement el : elements) {
+                        result.append(String.format("%-40s %s\n", threadName, el.toString()));
+                    }
+                    result.append("\n");
+                }
+            }
+        } catch (Throwable ignored) {
+        }
+
+        return result.toString();
+    }
+
+    public static String jstack(final String threadName, final long threadId) {
+        Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
+        StringBuilder result = new StringBuilder();
+        try {
+            for (final Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
+                StackTraceElement[] elements = entry.getValue();
+                Thread thread = entry.getKey();
+                if (elements != null && elements.length > 0) {
+                    if (threadName.equals(entry.getKey().getName()) && threadId == entry.getKey().getId()) {
+                        result.append(String.format("%-40s TID: %d STATE: %s\n", threadName, thread.getId(), thread.getState()));
+                        for (StackTraceElement el : elements) {
+                            result.append(String.format("%-40s %s\n", threadName, el.toString()));
+                        }
+                    }
+                }
+            }
+        } catch (Throwable ignored) {
+        }
+        return result.toString();
+    }
+
+    public static ExecuteResult callShellCommand(final String shellString) {
+        Process process = null;
+        try {
+            String[] cmdArray = shellString.split(" ");
+            process = Runtime.getRuntime().exec(cmdArray);
+            process.waitFor();
+        } catch (Throwable ignored) {
+        } finally {
+            if (null != process)
+                process.destroy();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java
new file mode 100644
index 0000000..e7330b8
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/processor/RpcRequestProcessor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.processor;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.rpc.annotation.RemoteService;
+import org.apache.rocketmq.rpc.impl.command.ResponseCode;
+import org.apache.rocketmq.rpc.impl.context.RpcProviderContext;
+import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionInvokeMessage;
+import org.apache.rocketmq.rpc.impl.metrics.ServiceStats;
+import org.apache.rocketmq.rpc.impl.service.RpcEntry;
+import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract;
+import org.apache.rocketmq.rpc.impl.service.RpcServiceCallBody;
+import org.apache.rocketmq.rpc.internal.ServiceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcRequestProcessor implements RequestProcessor {
+    private static final Logger log = LoggerFactory.getLogger(RpcRequestProcessor.class);
+    private Map<String, RpcEntry> serviceTable = new ConcurrentHashMap<>(64);
+    private Map<String, ExecutorService> executorTable = new ConcurrentHashMap<>(64);
+    private ThreadLocal<RpcProviderContext> threadLocalProviderContext;
+    private RpcInstanceAbstract rpcInstanceAbstract;
+    private ServiceStats serviceStats;
+
+    public RpcRequestProcessor(ThreadLocal<RpcProviderContext> threadLocalProviderContext,
+        final RpcInstanceAbstract rpcInstanceAbstract, ServiceStats stats) {
+        this.threadLocalProviderContext = threadLocalProviderContext;
+        this.rpcInstanceAbstract = rpcInstanceAbstract;
+        this.serviceStats = stats;
+    }
+
+    public Set<String> putNewService(final Object obj) {
+        return putNewService(obj, null);
+    }
+
+    public Set<String> putNewService(final Object obj, final ExecutorService executorService) {
+        Class<?>[] interfaces = obj.getClass().getInterfaces();
+        for (Class<?> itf : interfaces) {
+            RemoteService serviceExport = itf.getAnnotation(RemoteService.class);
+            if (null == serviceExport) {
+                log.warn("Service:{} is not remark annotation", itf.getName());
+                continue;
+            }
+
+            Method[] methods = itf.getMethods();
+            for (Method method : methods) {
+                if (!ServiceUtil.testServiceExportMethod(method)) {
+                    log.error("The method: [{}] not matched RPC standard", method.toGenericString());
+                    continue;
+                }
+
+                String requestCode = ServiceUtil.toRequestCode(serviceExport, method);
+                RpcEntry se = new RpcEntry();
+                se.setServiceExport(serviceExport);
+                se.setObject(obj);
+                se.setMethod(method);
+                this.serviceTable.put(requestCode, se);
+                if (executorService != null) {
+                    this.executorTable.put(requestCode, executorService);
+                }
+            }
+        }
+        return this.serviceTable.keySet();
+    }
+
+    @Override
+    public RemotingCommand processRequest(RemotingChannel channel, RemotingCommand request) {
+        RpcProviderContext rpcProviderContext = new RpcProviderContext();
+        rpcProviderContext.setRemotingChannel(channel);
+        rpcProviderContext.setRemotingRequest(request);
+        rpcProviderContext.setRemotingResponse(rpcInstanceAbstract.remotingService().commandFactory().createResponse(request));
+        rpcProviderContext.setReturnResponse(true);
+        threadLocalProviderContext.set(rpcProviderContext);
+        this.processRequest0(rpcProviderContext, request);
+        threadLocalProviderContext.remove();
+
+        if (rpcProviderContext.isReturnResponse()) {
+            return rpcProviderContext.getRemotingResponse();
+        }
+        return null;
+    }
+
+    private Object[] buildParameter(final RpcProviderContext context, final RemotingCommand request,
+        RpcServiceCallBody serviceCallBody, Type[] parameterTypes) {
+        Object[] parameters = new Object[parameterTypes.length];
+        try {
+            int index = 0;
+            Serializer serialization = this.rpcInstanceAbstract.remotingService()
+                .serializerFactory().get(request.serializerType());
+            for (Type parameterType : parameterTypes) {
+                parameters[index] = serialization.decode(serviceCallBody.getParameter(index), parameterType);
+                index++;
+            }
+        } catch (Exception e) {
+            ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = new ServiceExceptionInvokeMessage();
+            serviceExceptionInvokeMessage.setClassFullName(e.getClass().getName());
+            serviceExceptionInvokeMessage.setErrorMessage(serializeException(request.serializerType(), e));
+            context.getRemotingResponse().opCode(ResponseCode.PARAMETER_ERROR);
+            context.getRemotingResponse().parameter(serviceExceptionInvokeMessage);
+        }
+        return parameters;
+    }
+
+    private String serializeException(byte serializeType, Exception exception) {
+        Serializer serialization = rpcInstanceAbstract.remotingService().serializerFactory().get(serializeType);
+        return serialization.encode(exception).toString();
+    }
+
+    private void dealWithException(final RpcProviderContext context, RemotingCommand request, Exception exception) {
+        if (exception instanceof InvocationTargetException) {
+            context.getRemotingResponse().opCode(ResponseCode.USER_SERVICE_EXCEPTION);
+        } else if (exception instanceof IllegalArgumentException) {
+            context.getRemotingResponse().opCode(ResponseCode.PARAMETER_ERROR);
+        } else if (exception instanceof IllegalAccessException) {
+            context.getRemotingResponse().opCode(ResponseCode.ILLEGAL_ACCESS);
+        } else if (exception instanceof NullPointerException) {
+            context.getRemotingResponse().opCode(ResponseCode.NULL_POINTER);
+        } else {
+            context.getRemotingResponse().opCode(ResponseCode.USER_SERVICE_EXCEPTION);
+        }
+        String remarkMsg, exceptionMsg, exceptionClassname;
+        ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = new ServiceExceptionInvokeMessage();
+        if (exception instanceof InvocationTargetException) {
+            exceptionMsg = ((InvocationTargetException) exception).getTargetException().getMessage();
+            remarkMsg = exceptionMsg == null ? "" : exceptionMsg;
+            exceptionClassname = ((InvocationTargetException) exception).getTargetException().getClass().getName();
+        } else {
+            exceptionMsg = exception.getMessage();
+            remarkMsg = exceptionMsg == null ? "" : exceptionMsg;
+            exceptionClassname = exception.getClass().getName();
+        }
+        serviceExceptionInvokeMessage.setClassFullName(exceptionClassname);
+        serviceExceptionInvokeMessage.setErrorMessage(remarkMsg);
+        if (exception.getCause() != null)
+            serviceExceptionInvokeMessage.setThrowable(exception.getCause());
+        Object[] args = new Object[] {serviceExceptionInvokeMessage};
+        Serializer serializer = ServiceUtil.selectSerializer(args, request.serializerType());
+        if (serializer != null)
+            context.getRemotingResponse().serializerType(serializer.type());
+        context.getRemotingResponse().parameter(serviceExceptionInvokeMessage);
+    }
+
+    private void processRequest0(final RpcProviderContext context, final RemotingCommand request) {
+        final RpcServiceCallBody serviceCallBody =
+            request.parameter(this.rpcInstanceAbstract.remotingService().serializerFactory(), RpcServiceCallBody.class);
+        final RpcEntry entry = this.serviceTable.get(serviceCallBody.getServiceId());
+        if (entry != null) {
+            Runnable runnable = new Runnable() {
+                @Override
+                public void run() {
+                    Type[] parameterTypes = entry.getMethod().getGenericParameterTypes();
+                    Object result = null;
+                    Exception exception = null;
+                    long startTime = System.currentTimeMillis();
+                    try {
+                        if (parameterTypes.length == 0) {
+                            result = entry.getMethod().invoke(entry.getObject());
+                        } else {
+                            Object[] parameters = buildParameter(context, request, serviceCallBody, parameterTypes);
+                            result = entry.getMethod().invoke(entry.getObject(), parameters);
+                        }
+                        serviceStats.addProviderOKQPSValue(serviceCallBody.getServiceId(), 1, 1);
+                        serviceStats.addProviderRTValue(serviceCallBody.getServiceId(), (int) (System.currentTimeMillis() - startTime), 1);
+                    } catch (Exception e) {
+                        exception = e;
+                    }
+
+                    if (exception != null)
+                        dealWithException(context, request, exception);
+                    else if (!entry.getMethod().getReturnType().equals(Void.class)) {
+                        Object[] args = new Object[] {result};
+                        Serializer serializer = ServiceUtil.selectSerializer(args, request.serializerType());
+                        if (serializer != null)
+                            context.getRemotingResponse().serializerType(serializer.type());
+                        context.getRemotingResponse().parameter(result);
+                    }
+                }
+            };
+            ExecutorService executorService = this.executorTable.get(serviceCallBody.getServiceId());
+            if (executorService != null) {
+                executorService.submit(runnable);
+            } else {
+                runnable.run();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java
new file mode 100644
index 0000000..0be1014
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/DefaultPromise.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.promise;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.rpc.api.Promise;
+import org.apache.rocketmq.rpc.api.PromiseListener;
+import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager;
+import org.apache.rocketmq.rpc.internal.RpcErrorMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultPromise<V> implements Promise<V> {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
+    private final Object lock = new Object();
+    private volatile FutureState state = FutureState.DOING;
+    private V result = null;
+    private long timeout;
+    private long createTime;
+    private Throwable exception = null;
+    private List<PromiseListener<V>> promiseListenerList;
+
+    public DefaultPromise() {
+        createTime = System.currentTimeMillis();
+        promiseListenerList = new ArrayList<>();
+        timeout = 5000;
+    }
+
+    @Override
+    public boolean cancel(final boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return state.isCancelledState();
+    }
+
+    @Override
+    public boolean isDone() {
+        return state.isDoneState();
+    }
+
+    @Override
+    public V get() {
+        return result;
+    }
+
+    @Override
+    public V get(final long timeout) {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return getValueOrThrowable();
+            }
+
+            if (timeout <= 0) {
+                try {
+                    lock.wait();
+                } catch (Exception e) {
+                    cancel(e);
+                }
+                return getValueOrThrowable();
+            } else {
+                long waitTime = timeout - (System.currentTimeMillis() - createTime);
+                if (waitTime > 0) {
+                    for (; ; ) {
+                        try {
+                            lock.wait(waitTime);
+                        } catch (InterruptedException e) {
+                            LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+                        }
+
+                        if (!isDoing()) {
+                            break;
+                        } else {
+                            waitTime = timeout - (System.currentTimeMillis() - createTime);
+                            if (waitTime <= 0) {
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                if (isDoing()) {
+                    timeoutSoCancel();
+                }
+            }
+            return getValueOrThrowable();
+        }
+    }
+
+    @Override
+    public boolean set(final V value) {
+        if (value == null)
+            return false;
+        this.result = value;
+        return done();
+    }
+
+    @Override
+    public boolean setFailure(final Throwable cause) {
+        if (cause == null)
+            return false;
+        this.exception = cause;
+        return done();
+    }
+
+    @Override
+    public void addListener(final PromiseListener<V> listener) {
+        if (listener == null) {
+            throw new NullPointerException("FutureListener is null");
+        }
+
+        boolean notifyNow = false;
+        synchronized (lock) {
+            if (!isDoing()) {
+                notifyNow = true;
+            } else {
+                if (promiseListenerList == null) {
+                    promiseListenerList = new ArrayList<>();
+                }
+                promiseListenerList.add(listener);
+            }
+        }
+
+        if (notifyNow) {
+            notifyListener(listener);
+        }
+    }
+
+    @Override
+    public Throwable getThrowable() {
+        return exception;
+    }
+
+    private void notifyListeners() {
+        if (promiseListenerList != null) {
+            for (PromiseListener<V> listener : promiseListenerList) {
+                notifyListener(listener);
+            }
+        }
+    }
+
+    private boolean isSuccess() {
+        return isDone() && (exception == null);
+    }
+
+    private void timeoutSoCancel() {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return;
+            }
+            state = FutureState.CANCELLED;
+            exception = new RuntimeException("Get request result is timeout or interrupted");
+            lock.notifyAll();
+        }
+        notifyListeners();
+    }
+
+    private V getValueOrThrowable() {
+        if (exception != null) {
+            Throwable e = exception.getCause() != null ? exception.getCause() : exception;
+            throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e);
+        }
+        notifyListeners();
+        return result;
+    }
+
+    private boolean isDoing() {
+        return state.isDoingState();
+    }
+
+    private boolean done() {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return false;
+            }
+
+            state = FutureState.DONE;
+            lock.notifyAll();
+        }
+
+        notifyListeners();
+        return true;
+    }
+
+    private void notifyListener(final PromiseListener<V> listener) {
+        try {
+            if (exception != null)
+                listener.operationFailed(this);
+            else
+                listener.operationCompleted(this);
+        } catch (Throwable t) {
+            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
+        }
+    }
+
+    private boolean cancel(Exception e) {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return false;
+            }
+
+            state = FutureState.CANCELLED;
+            exception = e;
+            lock.notifyAll();
+        }
+
+        notifyListeners();
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java
new file mode 100644
index 0000000..aebc5e3
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/promise/FutureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.promise;
+
+public enum FutureState {
+    /**
+     * the task is doing
+     **/
+    DOING(0),
+    /**
+     * the task is done
+     **/
+    DONE(1),
+    /**
+     * ths task is cancelled
+     **/
+    CANCELLED(2);
+
+    public final int value;
+
+    private FutureState(int value) {
+        this.value = value;
+    }
+
+    public boolean isCancelledState() {
+        return this == CANCELLED;
+    }
+
+    public boolean isDoneState() {
+        return this == DONE;
+    }
+
+    public boolean isDoingState() {
+        return this == DOING;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java
new file mode 100644
index 0000000..ba0180c
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/AdvancedServerImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.rpc.impl.server;
+
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.annotation.MethodType;
+import org.apache.rocketmq.rpc.api.AdvancedServer;
+import org.apache.rocketmq.rpc.api.Promise;
+import org.apache.rocketmq.rpc.impl.service.RpcJdkProxy;
+import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory;
+
+public class AdvancedServerImpl implements AdvancedServer {
+    private final SimpleServerImpl simpleServer;
+
+    public AdvancedServerImpl(final SimpleServerImpl simpleServer) {
+        this.simpleServer = simpleServer;
+    }
+
+    @Override
+    public <T> T callSync(final RemotingChannel channel, final String serviceCode, final String version,
+        final Object[] parameter, final Class<T> responseType) throws Exception {
+        RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter);
+        RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel);
+        return (T) simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.SYNC);
+    }
+
+    @Override
+    public <T> Promise<T> callAsync(final RemotingChannel channel, final String serviceCode, final String version,
+        final Object[] parameter, final Class<T> responseType) throws Exception {
+        RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter);
+        RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel);
+        return (Promise<T>) simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.ASYNC);
+    }
+
+    @Override
+    public void callOneway(final RemotingChannel channel, final String serviceCode, final String version,
+        final Object[] parameter) throws Exception {
+        RemotingCommand request = simpleServer.createRemoteRequest(serviceCode, version, parameter);
+        RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleServer, simpleServer.getRemotingServer(), simpleServer.getRpcCommonConfig(), channel);
+        simpleServer.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, Void.TYPE, MethodType.ASYNC);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
new file mode 100644
index 0000000..e076cbe
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.server;
+
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.RemotingService;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.netty.RemotingBootstrapFactory;
+import org.apache.rocketmq.rpc.api.AdvancedServer;
+import org.apache.rocketmq.rpc.api.SimpleServer;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract;
+import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory;
+
+public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServer {
+    private RemotingServer remotingServer;
+    private ExecutorService callServiceThreadPool;
+    private RpcCommonConfig rpcCommonConfig;
+
+    public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
+        this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
+        this.rpcCommonConfig = remotingConfig;
+        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
+            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
+            "serverCallServiceThread", true);
+    }
+
+    public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) {
+        super(remotingConfig);
+        this.remotingServer = remotingServer;
+    }
+
+    @Override
+    public RemotingService remotingService() {
+        return this.remotingServer;
+    }
+
+    @Override
+    public void registerServiceListener() {
+
+    }
+
+    @Override
+    public <T> T bind(final Class<T> service, final RemotingChannel channel, final Properties properties) {
+        return this.narrow0(service, RpcProxyFactory.createServiceProxy(service, this, remotingServer, rpcCommonConfig, channel));
+    }
+
+    @Override
+    public AdvancedServer advancedServer() {
+        return new AdvancedServerImpl(this);
+    }
+
+    @Override
+    public void publish(final Object service) {
+        this.publishService0(service);
+    }
+
+    @Override
+    public void publish(final Object service, final ExecutorService executorService) {
+        this.publishService0(service, executorService);
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        this.remotingServer.start();
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        ThreadUtils.shutdownGracefully(this.callServiceThreadPool, 3000, TimeUnit.MILLISECONDS);
+        this.remotingServer.stop();
+    }
+
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+    public void setRemotingServer(final RemotingServer remotingServer) {
+        this.remotingServer = remotingServer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java
new file mode 100644
index 0000000..c0a0e8c
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcConnectionListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+
+public class RpcConnectionListener implements ChannelEventListener {
+    private RpcInstanceAbstract rpcInstanceAbstract;
+
+    public RpcConnectionListener(RpcInstanceAbstract rpcInstanceAbstract) {
+        this.rpcInstanceAbstract = rpcInstanceAbstract;
+    }
+
+    @Override
+    public void onChannelConnect(final RemotingChannel remotingChannel) {
+        this.rpcInstanceAbstract.registerServiceListener();
+    }
+
+    @Override
+    public void onChannelClose(final RemotingChannel remotingChannel) {
+
+    }
+
+    @Override
+    public void onChannelException(final RemotingChannel remotingChannel) {
+
+    }
+
+    @Override
+    public void onChannelIdle(final RemotingChannel remotingChannel) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java
new file mode 100644
index 0000000..9bfd53a
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcEntry.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.rpc.annotation.RemoteService;
+
+public class RpcEntry {
+    private RemoteService serviceExport;
+    private Method method;
+    private Object object;
+
+    public Method getMethod() {
+        return method;
+    }
+
+    public void setMethod(Method method) {
+        this.method = method;
+    }
+
+    public Object getObject() {
+        return object;
+    }
+
+    public void setObject(Object object) {
+        this.object = object;
+    }
+
+    public RemoteService getServiceExport() {
+        return serviceExport;
+    }
+
+    public void setServiceExport(RemoteService serviceExport) {
+        this.serviceExport = serviceExport;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
new file mode 100644
index 0000000..2b1288c
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.rpc.impl.command.RpcRequestCode;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+import org.apache.rocketmq.rpc.impl.context.RpcProviderContext;
+import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl;
+import org.apache.rocketmq.rpc.impl.metrics.ThreadStats;
+import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor;
+
+import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory;
+
+public abstract class RpcInstanceAbstract extends RpcProxyCommon {
+    protected final RpcRequestProcessor rpcRequestProcessor;
+    protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>();
+    protected final RpcCommonConfig rpcCommonConfig;
+    protected ThreadStats threadStats;
+    private DefaultServiceAPIImpl defaultServiceAPI;
+    private ThreadPoolExecutor invokeServiceThreadPool;
+
+    public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) {
+        super(rpcCommonConfig);
+        this.threadStats = new ThreadStats();
+        this.rpcCommonConfig = rpcCommonConfig;
+        this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
+
+        this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true));
+
+    }
+
+    public void start() {
+        this.defaultServiceAPI = new DefaultServiceAPIImpl(this.serviceStats, threadStats);
+        this.serviceStats.start();
+        this.publishService0(this.defaultServiceAPI);
+        this.remotingService().registerRequestProcessor(RpcRequestCode.CALL_SERVICE, this.rpcRequestProcessor, this.invokeServiceThreadPool);
+    }
+
+    public void stop() {
+        this.serviceStats.stop();
+        ThreadUtils.shutdownGracefully(this.invokeServiceThreadPool, 3000, TimeUnit.MILLISECONDS);
+    }
+
+    protected void publishService0(Object service) {
+        this.rpcRequestProcessor.putNewService(service);
+    }
+
+    protected void publishService0(Object service, ExecutorService executorService) {
+        this.rpcRequestProcessor.putNewService(service, executorService);
+    }
+
+    protected <T> T narrow0(Class<T> service, RpcJdkProxy rpcJdkProxy) {
+        return rpcJdkProxy.newInstance(service);
+    }
+
+    public abstract void registerServiceListener();
+
+    public ThreadPoolExecutor getInvokeServiceThreadPool() {
+        return invokeServiceThreadPool;
+    }
+
+    public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) {
+        this.invokeServiceThreadPool = invokeServiceThreadPool;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java
new file mode 100644
index 0000000..351c8b4
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager;
+import org.apache.rocketmq.rpc.internal.RpcErrorMapper;
+
+public abstract class RpcJdkProxy implements InvocationHandler {
+    private final Class<?> service;
+    private final RpcProxyCommon rpcProxyCommon;
+
+    public RpcJdkProxy(final Class<?> service, final RpcProxyCommon rpcProxyCommon) {
+        this.service = service;
+        this.rpcProxyCommon = rpcProxyCommon;
+    }
+
+    @Override
+    public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+        return rpcProxyCommon.invoke0(proxy, this, service, method, args);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T newInstance(Class<T> service) {
+        try {
+            return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class[] {service}, this);
+        } catch (Exception e) {
+            throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e);
+        }
+    }
+
+    public abstract void invokeOneWay(final RemotingCommand request);
+
+    public abstract void invokeAsync(final RemotingCommand request, final AsyncHandler handler);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java
new file mode 100644
index 0000000..4cff74f
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+
+public class RpcJdkProxyClient extends RpcJdkProxy {
+    private RemotingClient remotingClient;
+    private String remotingAddress;
+    private RpcCommonConfig rpcCommonConfig;
+
+    public RpcJdkProxyClient(final Class<?> service,
+        final RpcProxyCommon rpcProxyCommon,
+        final RemotingClient remotingClient,
+        final RpcCommonConfig rpcCommonConfig,
+        final String remotingAddress) {
+        super(service, rpcProxyCommon);
+        this.remotingClient = remotingClient;
+        this.rpcCommonConfig = rpcCommonConfig;
+        this.remotingAddress = remotingAddress;
+    }
+
+    @Override
+    public void invokeOneWay(final RemotingCommand request) {
+        this.remotingClient.invokeOneWay(remotingAddress, request, rpcCommonConfig.getServiceInvokeTimeout());
+    }
+
+    @Override
+    public void invokeAsync(final RemotingCommand request, final AsyncHandler handler) {
+        this.remotingClient.invokeAsync(remotingAddress, request, handler, rpcCommonConfig.getServiceInvokeTimeout());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java
new file mode 100644
index 0000000..c67c8cc
--- /dev/null
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcJdkProxyServer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.impl.service;
+
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig;
+
+public class RpcJdkProxyServer extends RpcJdkProxy {
+    private final RemotingServer remotingServer;
+    private final RemotingChannel remotingChannel;
+    private final RpcCommonConfig rpcCommonConfig;
+
+    public RpcJdkProxyServer(final Class<?> service,
+        final RpcProxyCommon rpcProxyCommon,
+        final RemotingServer remotingServer,
+        final RpcCommonConfig rpcCommonConfig,
+        final RemotingChannel remotingChannel) {
+        super(service, rpcProxyCommon);
+        this.remotingServer = remotingServer;
+        this.remotingChannel = remotingChannel;
+        this.rpcCommonConfig = rpcCommonConfig;
+    }
+
+    @Override
+    public void invokeAsync(final RemotingCommand request, final AsyncHandler handler) {
+        this.remotingServer.invokeAsync(remotingChannel, request, handler, rpcCommonConfig.getServiceInvokeTimeout());
+    }
+
+    @Override
+    public void invokeOneWay(final RemotingCommand request) {
+        this.remotingServer.invokeOneWay(remotingChannel, request, rpcCommonConfig.getServiceInvokeTimeout());
+    }
+}