You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "lichenglin (JIRA)" <ji...@apache.org> on 2016/11/16 07:16:58 UTC
[jira] [Created] (FLUME-3024)
TestThriftRpcClient.testMultipleThreads is not really multipleThreads
lichenglin created FLUME-3024:
---------------------------------
Summary: TestThriftRpcClient.testMultipleThreads is not really multipleThreads
Key: FLUME-3024
URL: https://issues.apache.org/jira/browse/FLUME-3024
Project: Flume
Issue Type: Bug
Components: Test
Affects Versions: v1.6.0
Reporter: lichenglin
{code}
ThriftRpcClient client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("172.17.0.12", 4444, 10);
int threadCount = 100;
ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount);
ArrayList<Future<?>> futures = new ArrayList<Future<?>>(threadCount);
for (int i = 0; i < threadCount; i++) {
futures.add(submissionSvc.submit(new Runnable() {
public void run() {
try {
insertAsBatch(client, 0, 9);
} catch (Exception e) {
e.printStackTrace(); // To change body of catch
// statement use
// File | Settings | File Templates.
}
}
}));
}
for (int i = 0; i < threadCount; i++) {
futures.get(i).get();
}
{code}
Although insertAsBatch is submit to a Thread pool, but the true insert occur when futures.get(i).get();
this is not a synchronized action,so the insert is a synchronized too;
when changing to
{code}
submissionSvc.submit(new Runnable() {
public void run() {
try {
insertAsBatch(client, 0, 9);
} catch (Exception e) {
e.printStackTrace();
}
}
}).get()
{code}
Exception occur.
May be a real test is like this
{code}
package thrift;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.tomcat.jni.Thread;
import org.junit.internal.runners.statements.RunAfters;
public class Client {
static AtomicInteger i = new AtomicInteger(0);
static ThreadLocal<ThriftSourceProtocol.Client> l = new ThreadLocal<>();
public static void main(String[] args) throws TException {
ExecutorService submissionSvc = Executors.newFixedThreadPool(10);
Map<String, String> map = new HashMap<>();
for (int j = 0; j < 1000; j++) {
submissionSvc.submit(new Runnable() {
@Override
public void run() {
if (l.get() == null) {
try {
l.set(create());
} catch (TTransportException e) {
e.printStackTrace();
}
}
try {
int k = i.incrementAndGet();
Status s = l.get().append(new ThriftFlumeEvent(map,
ByteBuffer.wrap(String.valueOf(java.lang.Thread.currentThread().getName()).getBytes())));
System.out.println(k+ "****" + java.lang.Thread.currentThread().getName());
} catch (Exception e) {
l.remove();
e.printStackTrace();
}
}
private thrift.ThriftSourceProtocol.Client create() throws TTransportException {
TSocket tt = new TSocket("172.17.0.12", 4444);
TTransport transport = new TFastFramedTransport(tt);
ThriftSourceProtocol.Client client = new ThriftSourceProtocol.Client(
new TCompactProtocol(transport));
transport.open();
return client;
}
});
}
}
}
{code}
we must make sure each thread has it's own connection,so use threadlocal
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)