You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "lichenglin (JIRA)" <ji...@apache.org> on 2016/11/16 07:16:58 UTC

[jira] [Created] (FLUME-3024) TestThriftRpcClient.testMultipleThreads is not really multipleThreads

lichenglin created FLUME-3024:
---------------------------------

             Summary: TestThriftRpcClient.testMultipleThreads  is not really multipleThreads  
                 Key: FLUME-3024
                 URL: https://issues.apache.org/jira/browse/FLUME-3024
             Project: Flume
          Issue Type: Bug
          Components: Test
    Affects Versions: v1.6.0
            Reporter: lichenglin


{code}
ThriftRpcClient client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("172.17.0.12", 4444, 10);
		int threadCount = 100;
		ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount);
		ArrayList<Future<?>> futures = new ArrayList<Future<?>>(threadCount);
		for (int i = 0; i < threadCount; i++) {
			futures.add(submissionSvc.submit(new Runnable() {
				public void run() {
					try {
						insertAsBatch(client, 0, 9);
					} catch (Exception e) {
						e.printStackTrace(); // To change body of catch
												// statement use
						// File | Settings | File Templates.
					}
				}
			}));
		}
		for (int i = 0; i < threadCount; i++) {
			futures.get(i).get();
		}
{code}

Although insertAsBatch is submit to a Thread pool, but the true insert occur when futures.get(i).get(); 

this is not a synchronized action,so the insert is a synchronized too;

when changing to 
{code}
submissionSvc.submit(new Runnable() {
				public void run() {
					try {
						insertAsBatch(client, 0, 9);
					} catch (Exception e) {
						e.printStackTrace(); 
					}
				}
			}).get()
{code}
Exception occur.
May be a real test is like this

{code}
package thrift;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.tomcat.jni.Thread;
import org.junit.internal.runners.statements.RunAfters;

public class Client {

	static AtomicInteger i = new AtomicInteger(0);

	static ThreadLocal<ThriftSourceProtocol.Client> l = new ThreadLocal<>();

	public static void main(String[] args) throws TException {
		ExecutorService submissionSvc = Executors.newFixedThreadPool(10);
		Map<String, String> map = new HashMap<>();
		for (int j = 0; j < 1000; j++) {
			submissionSvc.submit(new Runnable() {
				@Override
				public void run() {
					if (l.get() == null) {
						try {
							l.set(create());
						} catch (TTransportException e) {
							e.printStackTrace();
						}
					}
					try {
						int k = i.incrementAndGet();
						Status s = l.get().append(new ThriftFlumeEvent(map,
								ByteBuffer.wrap(String.valueOf(java.lang.Thread.currentThread().getName()).getBytes())));
						System.out.println(k+ "****" + java.lang.Thread.currentThread().getName());
					} catch (Exception e) {
						l.remove();
						e.printStackTrace();
					}
				}

				private thrift.ThriftSourceProtocol.Client create() throws TTransportException {
					TSocket tt = new TSocket("172.17.0.12", 4444);
					TTransport transport = new TFastFramedTransport(tt);
					ThriftSourceProtocol.Client client = new ThriftSourceProtocol.Client(
							new TCompactProtocol(transport));
					transport.open();
					return client;
				}
			});
		}
	}
}
{code}

we must make sure each thread has it's own connection,so use threadlocal



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)