You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by wa...@apache.org on 2019/12/11 14:39:36 UTC
[dubbo-php-framework] 22/31: update
This is an automated email from the ASF dual-hosted git repository.
wangxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-php-framework.git
commit af3df5c02176b4c598ea55b147b2f7e27af60cd8
Author: wangjinxi <wa...@che001.com>
AuthorDate: Thu Jul 4 18:26:45 2019 +0800
update
---
common/file/FSOFRedis.php | 64 ++++++++++------------
common/protocol/fsof/DubboParser.php | 52 ++++++------------
common/url/FSOFUrl.php | 2 +
config/global/conf/fsof.ini | 10 ++--
consumer/ConsumerException.php | 16 ++++++
consumer/Type.php | 38 ++++++++++++-
consumer/fsof/FSOFProcessor.php | 47 ++++++++--------
consumer/proxy/Proxy.php | 5 +-
consumer/proxy/ProxyFactory.php | 4 +-
demo/demo-consumer/config/log4php.xml | 6 +-
demo/demo-consumer/consumer/demo-consumer.consumer | 10 +---
11 files changed, 143 insertions(+), 111 deletions(-)
diff --git a/common/file/FSOFRedis.php b/common/file/FSOFRedis.php
index e61a8e3..914e0a9 100644
--- a/common/file/FSOFRedis.php
+++ b/common/file/FSOFRedis.php
@@ -22,7 +22,6 @@ use com\fenqile\fsof\common\config\FSOFConstants;
class FSOFRedis
{
- const REDIS_TIME_OUT = 1;
private static $_instance;
@@ -30,7 +29,9 @@ class FSOFRedis
private $logger;
- private $timeout = self::REDIS_TIME_OUT;
+ private $connect_timeout = 1;
+
+ private $read_timeout = 2;
private $retry_count = 1;
@@ -40,13 +41,14 @@ class FSOFRedis
[FSOFConstants::FSOF_SERVICE_REDIS_HOST, FSOFConstants::FSOF_SERVICE_REDIS_PORT],
];
- public static function instance($config)
+ public static function instance($config = [])
{
if (extension_loaded('redis'))
{
if (!isset(FSOFRedis::$_instance))
{
FSOFRedis::$_instance = new FSOFRedis($config);
+ FSOFRedis::$_instance->get_redis();
}
return FSOFRedis::$_instance;
}
@@ -57,10 +59,10 @@ class FSOFRedis
return NULL;
}
- public function __construct($config)
+ public function __construct($config = [])
{
$this->logger = \Logger::getLogger(__CLASS__);
- if($config['redis_hosts'])
+ if(isset($config['redis_hosts']))
{
$this->hosts = [];
$address = explode(',', $config['redis_hosts']);
@@ -69,20 +71,22 @@ class FSOFRedis
$this->hosts[] = [$host, $port??FSOFConstants::FSOF_SERVICE_REDIS_PORT];
}
}
- if($config['redis_connect_timeout'])
+ if(isset($config['redis_connect_timeout']))
+ {
+ $this->connect_timeout = $config['redis_connect_timeout'];
+ }
+ if(isset($config['redis_read_timeout']))
{
- $this->timeout = $config['redis_connect_timeout'];
+ $this->read_timeout = $config['redis_read_timeout'];
}
- if($config['redis_connect_type'])
+ if(isset($config['redis_connect_type']))
{
$this->connect_type = $config['redis_connect_type'];
}
- if($config['redis_retry_count'])
+ if(isset($config['redis_retry_count']))
{
$this->retry = min($config['redis_retry_count'], 1);
}
-
- $this->get_redis();
}
public function get_redis()
@@ -92,24 +96,25 @@ class FSOFRedis
$hosts_count = count($this->hosts);
$retry = $this->retry_count;
$rand_num = rand() % $hosts_count;
+ $ret = false;
do{
try{
$redis_cli = new \Redis();
- if($this->connect_type == FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_SOCK)
+ if($this->connect_type == FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP)
{
$node = $this->hosts[$rand_num];
- $ret = $redis_cli->connect($node[0],$node[1],$this->timeout);
+ $ret = $redis_cli->connect($node[0],$node[1],$this->connect_timeout);
+ $redis_cli->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout);
$rand_num = ($rand_num + 1)%$hosts_count;
if (!$ret)
{
$this->logger->warn("connect redis failed[{$node[0]}:{$node[1]}]");
}
}else{
- $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,FSOFConstants::FSOF_SERVICE_REDIS_PORT,$this->timeout);
+ $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,FSOFConstants::FSOF_SERVICE_REDIS_PORT,$this->connect_timeout);
}
if($ret)
{
- $e = null;
break;
}
}catch (\Exception $e){
@@ -162,27 +167,14 @@ class FSOFRedis
public function getlist($key)
{
- $ret = NULL;
- if (!empty($key))
- {
- try
- {
- if(!isset($this->m_redis))
- {
- $this->get_redis();
- }
- $ret = $this->getlRange($key);
- }
- catch (\Exception $e)
- {
- $this->logger->warn('redis current connect excepiton'.' |errcode:'.$e->getCode().' |errmsg:'.$e->getMessage());
- $this->close();
- //重试一次
- $this->get_redis();
- $ret = $this->getlRange($key);
- }
- }
- return $ret;
+ if (!empty($key) && isset($this->m_redis))
+ {
+ return $this->getlRange($key);
+ }
+ else
+ {
+ return null;
+ }
}
public function set($key, $value)
diff --git a/common/protocol/fsof/DubboParser.php b/common/protocol/fsof/DubboParser.php
index d939b07..b812fae 100644
--- a/common/protocol/fsof/DubboParser.php
+++ b/common/protocol/fsof/DubboParser.php
@@ -1,8 +1,9 @@
<?php
-
namespace com\fenqile\fsof\common\protocol\fsof;
use com\fenqile\fsof\consumer\Type;
+use Icecave\Flax\Serialization\Encoder;
+use Icecave\Flax\DubboParser as Decoder;
/**
*
@@ -71,7 +72,7 @@ class DubboParser
public function packRequest(DubboRequest $request)
{
- if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]) {
+ if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == (self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]??null)) {
$reqData = $this->buildBodyForHessian2($request);
$serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2;
} else {
@@ -104,12 +105,7 @@ class DubboParser
}
$reqData .= json_encode($request->getMethod()) . PHP_EOL;
$reqData .= json_encode($this->typeRefs($request)) . PHP_EOL;
- foreach ($request->getParams() as $value) {
- if ($value instanceof \stdClass) {
- $value = $value->object;
- } elseif ($value instanceof Type) {
- $value = $value->value;
- }
+ foreach (Type::getDataForSafed($request->getParams()) as $value) {
$reqData .= json_encode($value) . PHP_EOL;
}
$attach = array();
@@ -130,27 +126,19 @@ class DubboParser
public function buildBodyForHessian2(DubboRequest $request)
{
- $hess_stream = new \HessianStream();
- $hess_options = new \HessianOptions();
- $hess_factory = new \HessianFactory();
- $writer = $hess_factory->getWriter($hess_stream, $hess_options);
+ $encode = new Encoder();
$reqData = '';
- $reqData .= $writer->writeValue($request->getDubboVersion());
- $reqData .= $writer->writeValue($request->getService());
+ $reqData .= $encode->encode($request->getDubboVersion());
+ $reqData .= $encode->encode($request->getService());
if ($request->getVersion()) {
- $reqData .= $writer->writeValue($request->getVersion());
+ $reqData .= $encode->encode($request->getVersion());
} else {
- $reqData .= $writer->writeValue('');
+ $reqData .= $encode->encode('');
}
- $reqData .= $writer->writeValue($request->getMethod());
- $reqData .= $writer->writeValue($this->typeRefs($request));
- foreach ($request->getParams() as $value) {
- if ($value instanceof \stdClass) {
- $value = $value->object;
- } elseif ($value instanceof Type) {
- $value = $value->value;
- }
- $reqData .= $writer->writeValue($value);
+ $reqData .= $encode->encode($request->getMethod());
+ $reqData .= $encode->encode($this->typeRefs($request));
+ foreach (Type::getDataForSafed($request->getParams()) as $value) {
+ $reqData .= $encode->encode($value);
}
$attach = ['path' => $request->getService(), 'interface' => $request->getService(), 'timeout' => $request->getTimeout()];
if ($request->getGroup()) {
@@ -159,7 +147,7 @@ class DubboParser
if ($request->getVersion()) {
$attach['version'] = $request->getVersion();
}
- $reqData .= $writer->writeValue($attach);
+ $reqData .= $encode->encode($attach);
return $reqData;
}
@@ -184,7 +172,7 @@ class DubboParser
if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) {
$response->setHeartbeatEvent(true);
}
- $response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
+ $response->setSerialization($flag & self::SERIALIZATION_MASK);
$response->setLen($_arr["len"]);
return $response;
}
@@ -239,13 +227,9 @@ class DubboParser
private function parseResponseBodyForHessian2(DubboResponse $response)
{
if (!$response->isHeartbeatEvent()) {
- $_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN + 1);
- $response->setResponseBody($_data);
- $hess_stream = new \HessianStream($_data);
- $hess_options = new \HessianOptions();
- $hess_factory = new \HessianFactory();
- $parser = $hess_factory->getParser($hess_stream, $hess_options);
- $content = $parser->parseReply();
+ $_data = $response->getFullData();
+ $decode = new Decoder($_data);
+ $content = $decode->getData($_data);
$response->setResult($content);
}
return $response;
diff --git a/common/url/FSOFUrl.php b/common/url/FSOFUrl.php
index f9042a0..bb9fcb4 100644
--- a/common/url/FSOFUrl.php
+++ b/common/url/FSOFUrl.php
@@ -153,6 +153,8 @@ class FSOFUrl
if(isset($getArgs[self::URL_SERIALIZATION]))
{
$this->serialization = $getArgs[self::URL_SERIALIZATION];
+ } else {
+ $this->serialization = 'hessian2';
}
}
diff --git a/config/global/conf/fsof.ini b/config/global/conf/fsof.ini
index d4d698b..aa92cfc 100644
--- a/config/global/conf/fsof.ini
+++ b/config/global/conf/fsof.ini
@@ -1,6 +1,6 @@
[fsof_container_setting]
;php path
-php = '/usr/local/php-7.1.12/bin/php'
+php = '/usr/bin/php'
;app's user
user = root
@@ -16,19 +16,19 @@ p2p_mode = false
zklog_level = 0
;zookeepr log path
-zklog_path = '/home/devops/workspace/dubbo/logs/zookeeper.log'
+zklog_path = '/var/fsof/provider/zookeeper.log'
;zookeeper url list
-zk_url_list = http://192.168.214.148:2181,http://192.168.214.148:2182,http://192.168.214.148:2183
+zk_url_list = http://127.0.0.1:2181
;provider overload mode switch
overload_mode = true
-;if request wait more than waiting_time before processed, we will lost this quest, unit is micro-second
+;if request wait more than waiting_time before processed, we will lost this quest, unit is micro-second
waiting_time = 2000
;if overload_number requests trigger overload rule continuous, we will open loss request mode
overload_number = 5
-;how many quest is lost before lost mode is close
+;how many quest is lost before lost mode is close
loss_number = 20
\ No newline at end of file
diff --git a/consumer/ConsumerException.php b/consumer/ConsumerException.php
new file mode 100644
index 0000000..dc2f465
--- /dev/null
+++ b/consumer/ConsumerException.php
@@ -0,0 +1,16 @@
+<?php
+namespace com\fenqile\fsof\consumer;
+
+use Exception;
+
+class ConsumerException extends Exception
+{
+ /**
+ * @param string $message The exception message.
+ * @param Exception|null $previous The previous exception, if any.
+ */
+ public function __construct($message, Exception $previous = null)
+ {
+ parent::__construct($message, 0, $previous);
+ }
+}
\ No newline at end of file
diff --git a/consumer/Type.php b/consumer/Type.php
index 5afa4b5..3000d5b 100644
--- a/consumer/Type.php
+++ b/consumer/Type.php
@@ -140,11 +140,47 @@ class Type
$std = new \stdClass;
foreach ($properties as $key => $value)
{
- $std->$key = ($value instanceof Type) ? $value->value : $value;
+ $std->$key = ($value instanceof Type) ? self::typeTosafe($value) : $value;
}
$std_wrap = new \stdClass();
$std_wrap->object = $std;
$std_wrap->class = 'L'.str_replace('.', '/', $class).';';
return $std_wrap;
}
+
+ public static function getDataForSafed($args)
+ {
+ foreach ($args as &$value)
+ {
+ if ($value instanceof \stdClass) {
+ $value = $value->object;
+ } elseif ($value instanceof Type) {
+ $value = self::typeTosafe($value);
+ }
+ }
+ return $args;
+ }
+
+ public static function typeTosafe(Type $type)
+ {
+ switch ($type->type){
+ case Type::SHORT:
+ case Type::INT:
+ case Type::LONG:
+ $value = (int)$type->value;
+ break;
+ case Type::FLOAT:
+ case Type::DOUBLE:
+ $value = (float)$type->value;
+ break;
+ case Type::BOOLEAN:
+ $value = (bool)$type->value;
+ break;
+ case Type::STRING:
+ default:
+ $value = (string)$type->value;
+ break;
+ }
+ return $value;
+ }
}
\ No newline at end of file
diff --git a/consumer/fsof/FSOFProcessor.php b/consumer/fsof/FSOFProcessor.php
index fd854b5..6555542 100644
--- a/consumer/fsof/FSOFProcessor.php
+++ b/consumer/fsof/FSOFProcessor.php
@@ -22,6 +22,7 @@ use com\fenqile\fsof\common\protocol\fsof\DubboParser;
use com\fenqile\fsof\common\protocol\fsof\DubboRequest;
use com\fenqile\fsof\common\protocol\fsof\DubboResponse;
use com\fenqile\fsof\consumer\client\FSOFClient4Linux;
+use com\fenqile\fsof\consumer\ConsumerException;
class FSOFProcessor
{
@@ -35,6 +36,8 @@ class FSOFProcessor
private $logger;
+ private $iotimeout = 3;
+
public function __construct()
{
$this->logger = \Logger::getLogger(__CLASS__);
@@ -43,6 +46,7 @@ class FSOFProcessor
public function executeRequest(DubboRequest $request, $svrAddr, $ioTimeOut, &$providerAddr)
{
+ $this->iotimeout = $ioTimeOut;
//计算服务端个数
$svrNum = count($svrAddr);
//连接异常重试次数最多2次
@@ -66,10 +70,10 @@ class FSOFProcessor
$request->port = $port;
$request->setGroup($svrUrl->getGroup());
$request->setVersion( $svrUrl->getVersion());
- $request->setTimeout($ioTimeOut * 1000);
+ $request->setTimeout($this->iotimeout * 1000);
$request->setSerialization($svrUrl->getSerialization(DubboParser::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON));
- $client = $this->connectProvider($host, $port, $ioTimeOut);
+ $client = $this->connectProvider($host, $port, $this->iotimeout);
if(empty($client))
{
//记录连接错误日志
@@ -126,7 +130,7 @@ class FSOFProcessor
$msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")";
}
$this->logger->error("send date failed:" . $msg);
- throw new \Exception("发送请求数据失败");
+ throw new ConsumerException("发送请求数据失败");
}
}
catch (\Exception $e)
@@ -139,7 +143,7 @@ class FSOFProcessor
$msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")";
}
$this->logger->error("send date failed:" . $msg, $e);
- throw new \Exception("发送请求数据失败");
+ throw new ConsumerException("发送请求数据失败");
}
try
@@ -157,7 +161,7 @@ class FSOFProcessor
}
else
{
- throw new \Exception("与服务器建立连接失败");
+ throw new ConsumerException("与服务器建立连接失败");
}
return $ret;
}
@@ -206,11 +210,11 @@ class FSOFProcessor
{
if (0 == $socket->getlasterror())
{
- throw new \Exception("provider端己关闭网络连接");
+ throw new ConsumerException("provider端己关闭网络连接");
}
else
{
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
}
@@ -221,7 +225,7 @@ class FSOFProcessor
if (($response) && ($response->getSn() != $request->getSn()))
{
$this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]");
- throw new \Exception("请求包中的sn非法");
+ throw new ConsumerException("请求包中的sn非法");
}
//接收消息体
@@ -256,25 +260,25 @@ class FSOFProcessor
$tmpdata = $this->Recv($socket, $cur_len);
if ($tmpdata)
{
- $recv_data = $recv_data . $tmpdata;
+ $recv_data .= $tmpdata;
$resv_len -= $cur_len;
}
else
{
if (0 == $socket->getlasterror())
{
- throw new \Exception("provider端己关闭网络连接");
+ throw new ConsumerException("provider端己关闭网络连接");
}
else
{
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
}
- //如果超过15秒就当超时处理
- if ((microtime(true) - $start_time) > 15)
+ //如果超过设置的iotimeout就当超时处理
+ if ((microtime(true) - $start_time) > $this->iotimeout)
{
$this->logger->error("Multi recv {$resv_len} bytes data timeout");
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
} while ($resv_len > 0);
@@ -285,7 +289,7 @@ class FSOFProcessor
{
if(DubboResponse::OK != $response->getStatus())
{
- throw new \Exception($response->getErrorMsg());
+ throw new ConsumerException($response->getErrorMsg());
}
else
{
@@ -295,7 +299,7 @@ class FSOFProcessor
else
{
$this->logger->error("parse response body err:".$response->__toString());
- throw new \Exception("未知异常");
+ throw new ConsumerException("未知异常");
}
}
@@ -303,21 +307,20 @@ class FSOFProcessor
{
try
{
+ $start_time = microtime(true);
$resv_len = $len;
$_data = '';
- $cnt = 20;//最多循环20次,防止provider端挂掉时,consumer陷入死循环
do
{
- $cnt--;
$tmp_data = $socket->recv($resv_len);
if (!$tmp_data)
{
$this->logger->warn("socket->recv faile:$resv_len");
break;
}
- $_data = $_data . $tmp_data;
+ $_data .= $tmp_data;
$resv_len -= strlen($tmp_data);
- } while (($resv_len > 0) && ($cnt > 0));
+ } while (($resv_len > 0) && ( (microtime(true) - $start_time) > $this->iotimeout)); //读取数据不能超过设置的io时长
if ($resv_len > 0)
{
@@ -332,11 +335,11 @@ class FSOFProcessor
$this->logger->error('recv data exception',$e);
if(self::FSOF_CONNECTION_RESET == $e->getCode())
{
- throw new \Exception("未知异常");
+ throw new ConsumerException("未知异常");
}
else
{
- throw new \Exception("接收应答数据超时");
+ throw new ConsumerException("接收应答数据超时");
}
}
}
diff --git a/consumer/proxy/Proxy.php b/consumer/proxy/Proxy.php
index 021cff7..723a4ba 100644
--- a/consumer/proxy/Proxy.php
+++ b/consumer/proxy/Proxy.php
@@ -83,11 +83,14 @@ final class Proxy
protected function generateParamType($args)
{
+ $types = [];
foreach ($args as $val) {
if($val instanceof \stdClass){
$types[] = $val->class;
- }else{
+ }else if($val instanceof Type){
$types[] = Type::adapter[$val->type]??'Ljava/lang/Object;';
+ }else{
+ $types[] = 'Ljava/lang/Object;';
}
}
return $types;
diff --git a/consumer/proxy/ProxyFactory.php b/consumer/proxy/ProxyFactory.php
index 5036b48..0b4c8da 100644
--- a/consumer/proxy/ProxyFactory.php
+++ b/consumer/proxy/ProxyFactory.php
@@ -22,6 +22,7 @@ use com\fenqile\fsof\common\log\FSOFSystemUtil;
use com\fenqile\fsof\common\config\FSOFConstants;
use com\fenqile\fsof\common\config\FSOFCommonUtil;
use com\fenqile\fsof\registry\automatic\ConsumerProxy;
+use com\fenqile\fsof\consumer\ConsumerException;
final class ProxyFactory
@@ -220,7 +221,7 @@ final class ProxyFactory
if (empty($ret))
{
$errMsg = "current_address:".FSOFSystemUtil::getLocalIP()."|".$consumerInterface;
- throw new \Exception($errMsg);
+ throw new ConsumerException($errMsg);
}
else
{
@@ -233,6 +234,7 @@ final class ProxyFactory
{
self::$logger->error('consumer_app:'.self::$appName.'|app_config_file:'.self::$appConfigFile.
'|version:'.$version.'|group:'.$group.'|provider_service:'.$consumerInterface.'|errmsg:'. $e->getMessage().'|exceptionmsg:'.$e);
+ throw new ConsumerException($e->getMessage(), $e);
}
return $ret;
}
diff --git a/demo/demo-consumer/config/log4php.xml b/demo/demo-consumer/config/log4php.xml
index 0b8828f..7e89f2f 100644
--- a/demo/demo-consumer/config/log4php.xml
+++ b/demo/demo-consumer/config/log4php.xml
@@ -1,9 +1,9 @@
<configuration xmlns="http://logging.apache.org/log4php/">
- <appender name="myAppender" class="LoggerAppenderDailyFile">
+ <appender name="myAppender" class="LoggerAppenderFile">
+ <param name="file" value="/tmp/consumer.log" />
<layout class="LoggerLayoutPattern">
- <param name="conversionPattern" value="[%date{Y-m-d H:i:s,u}][%level][traceid:%X{traceid}][%logger][%line] %message%newline" />
+ <param name="conversionPattern" value="[%date{Y-m-d H:i:s,u}][%level][%logger][%line] %message%newline" />
</layout>
- <param name="file" value="/home/devops/workspace/dubbo/logs/consumer_%s.log" />
</appender>
<root>
<level value="DEBUG" />
diff --git a/demo/demo-consumer/consumer/demo-consumer.consumer b/demo/demo-consumer/consumer/demo-consumer.consumer
index 81c86a1..8e007d8 100644
--- a/demo/demo-consumer/consumer/demo-consumer.consumer
+++ b/demo/demo-consumer/consumer/demo-consumer.consumer
@@ -1,16 +1,10 @@
[consumer_config]
p2p_mode = false
-#tcp/sock
-redis_connect_type = tcp
-redis_hosts = 127.0.0.1:6379,127.0.0.1:6379
-redis_connect_timeout = 1
-redis_retry_count = 1
-
[consumer_services]
com.fenqile.example.DemoService[group] = *
com.fenqile.example.DemoService[version] = 1.0.0
-com.imooc.springboot.dubbo.demo.ObjectDemoService[group] = *
-com.imooc.springboot.dubbo.demo.ObjectDemoService[version] = 1.0.0
\ No newline at end of file
+com.fenqile.arch.dubbo.service.DemoPhpService[group] = *
+com.fenqile.arch.dubbo.service.DemoPhpService[version] = 1.0.0
\ No newline at end of file