You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Zbyszek B (JIRA)" <ji...@apache.org> on 2018/03/12 11:23:00 UTC

[jira] [Commented] (IGNITE-7918) Huge memory leak when data streamer used together with local cache

    [ https://issues.apache.org/jira/browse/IGNITE-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395086#comment-16395086 ] 

Zbyszek B commented on IGNITE-7918:
-----------------------------------

{code:java}
package leak;

import org.apache.ignite.*;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;


public class Demo {

private final static Ignite ignite = createIgnite();

private final static AtomicLong producedCount = new AtomicLong();
private final static AtomicLong consumedCount = new AtomicLong();
private final static int numOfLocalFields = 10;
private static BinaryObject localPrototype = createLocalPrototype(); // prototype of local cache entity with all fields set to null

private final static int numOfGlobalFields = 5;
private final static List<String> globalFields = globalFields();
private static BinaryObject globalPrototype = createGlobalPrototype(); // prototype of cluster-wide cache entity with all fields set to null
private static final IgniteCache<String, BinaryObject> globalCache = getOrCreateGlobalCache();

private final static String ID = "id";

private final static boolean useLocalCacheToCauseMemLeak = true;

private final static BlockingQueue<Object> queue = new LinkedBlockingQueue<>(10);

public static void main(String[] args) throws Exception {

CompletableFuture.runAsync(Demo::runProducer);
CompletableFuture.runAsync(Demo::runConsumer);

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
do {
String input = br.readLine();
if ("q".equals(input)) {
System.out.println("Exit!");
System.exit(0);
} else {
String timestamp = new SimpleDateFormat("HH:mm:ss").format(new Date());
System.out.println(String.format("[%s] Queue size: %d", timestamp, queue.size()));
System.out.println(String.format("[%s] Local caches produced: %d", timestamp, producedCount.longValue()));
System.out.println(String.format("[%s] Local caches consumed %d", timestamp, consumedCount.longValue()));
System.out.println(String.format("[%s] Global cache size: %d", timestamp, globalCache.size(CachePeekMode.ALL)));
}
} while (true);
}

private static void runProducer() {
do {
long cacheId = producedCount.incrementAndGet();
try {
BinaryObject bObj = createLocalObject(1);
if (useLocalCacheToCauseMemLeak) {
IgniteCache<String, BinaryObject> cache = createLocalCache(cacheId);
cache.put(bObj.field(ID), bObj);
queue.put(cache);
} else {
Map<String, BinaryObject> cache = new HashMap<>();
cache.put(bObj.field(ID), bObj);
queue.put(cache);
}
} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
} while (true);
}


private static List<?> getSQLSelectRow(BinaryObject localObject) {
List<?> res = new ArrayList<>();
for (String f : globalFields) {
res.add(localObject.field(f));
}
return res;
}


@SuppressWarnings("unchecked")
private static void runConsumer() {
do {
try {
final Object taken = queue.take();
final List<List<?>> globalSQLRows = new ArrayList<>(); // Note: In practice rows are generated by executing SqlFieldsQuery against local cache; here omitted for sake of simplicity.
if (taken instanceof IgniteCache) {
final IgniteCache<String, BinaryObject> cache = IgniteCache.class.cast(taken);
cache.forEach(e -> {
globalSQLRows.add(getSQLSelectRow(e.getValue()));
});
cache.destroy();
} else if (taken instanceof Map) {
final Map<String, BinaryObject> cache = Map.class.cast(taken);
cache.forEach((k, v) -> {
globalSQLRows.add(getSQLSelectRow(v));
});
}

consumedCount.incrementAndGet();

final IgniteDataStreamer<String, BinaryObject> globalStreamer = createGlobalStreamer(globalCache);
for (List<?> row : globalSQLRows) {
final BinaryObjectBuilder globalBuilder = globalPrototype.toBuilder();
for (int i = 0; i < globalFields.size(); i++) {
globalBuilder.setField(globalFields.get(i), row.get(i));
}
final BinaryObject gObj = globalBuilder.build();
globalStreamer.addData(gObj.field(ID), gObj);
}
globalStreamer.flush();
globalStreamer.close();

} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
} while (true);
}

private static Ignite createIgnite() {
IgniteConfiguration iCfg = new IgniteConfiguration();
TcpCommunicationSpi tcpCommunication = new TcpCommunicationSpi();
tcpCommunication.setMessageQueueLimit(1024); // to get rid of the warning
iCfg.setCommunicationSpi(tcpCommunication);
String workDirectory = System.getProperty("user.home") + File.separator + "ignite";
iCfg.setWorkDirectory(workDirectory);
System.out.println();
System.out.println(String.format(">>> Starting Ignite on %s; work directory %s ...", "MyLeakingNode", workDirectory));
System.out.println();
final Ignite ignite = Ignition.start(iCfg);
ClusterNode localNode = ignite.cluster().localNode();
System.out.println();
System.out.println(String.format(">>> Ignite started on %s (%s) successfully!", "MyLeakingNode", localNode.id()));
System.out.println();
return ignite;
}

private static IgniteCache<String, BinaryObject> createLocalCache(long id) {
final String cacheName = "LocalCache" + id;
final CacheConfiguration<String, BinaryObject> cCfg = new CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.LOCAL);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
cCfg.setReadFromBackup(false);

final QueryEntity queryEntity = new QueryEntity(String.class.getTypeName(), "LocalEntity");
for (String field : localFields()) {
queryEntity.addQueryField(field, String.class.getTypeName(), field);
}
cCfg.setQueryEntities(Collections.singletonList( queryEntity));

ignite.destroyCache(cacheName); // local cache is not really local - reference can be kept by other nodes if restart during the load happens
return ignite.createCache(cCfg).withKeepBinary();
}


private static List<String> localFields() {
return Stream.concat(Stream.of(ID), IntStream.rangeClosed(1, numOfLocalFields).boxed().map(e -> String.format("field%s", e))).collect(Collectors.toList());
}

private static BinaryObject createLocalPrototype() {
BinaryObjectBuilder builder = ignite.binary().builder("LocalEntity");
for (String field : localFields()) {
builder.setField(field, null, String.class);
}
return builder.build();
}

private static BinaryObject createLocalObject(long id) {
BinaryObjectBuilder res = localPrototype.toBuilder();
for (String field : localFields()) {
if (ID.equals(field)) {
res.setField(field, String.valueOf(id), String.class);
} else {
res.setField(field, id + "@" + field, String.class);
}
}
return res.build();
}

private static IgniteCache<String, BinaryObject> getOrCreateGlobalCache() {
final String cacheName = "GlobalCache";
final CacheConfiguration<String, BinaryObject> cCfg = new CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.PARTITIONED);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
cCfg.setQueryEntities(Collections.singletonList(new QueryEntity(String.class.getTypeName(), cacheName))); // must be defined otherwise indexing Spi remove method is not called
return ignite.getOrCreateCache(cCfg).withKeepBinary();
}

private static IgniteDataStreamer<String, BinaryObject> createGlobalStreamer(IgniteCache<String, BinaryObject> cache) {
IgniteDataStreamer<String, BinaryObject> streamer = ignite.dataStreamer(cache.getName());
streamer.allowOverwrite(true);
streamer.skipStore(true);
streamer.keepBinary(true);
return streamer;
}

private static List<String> globalFields() {
return Stream.concat(Stream.of(ID), IntStream.rangeClosed(1, numOfGlobalFields).boxed().map(e -> String.format("field%s", e))).collect(Collectors.toList());
}

private static BinaryObject createGlobalPrototype() {
BinaryObjectBuilder builder = ignite.binary().builder("GlobalEntity");
for (String field : globalFields) {
builder.setField(field, null, String.class);
}
return builder.build();
}


}


{code}

> Huge memory leak when data streamer used together with local cache
> ------------------------------------------------------------------
>
>                 Key: IGNITE-7918
>                 URL: https://issues.apache.org/jira/browse/IGNITE-7918
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache
>    Affects Versions: 2.3
>            Reporter: Zbyszek B
>            Priority: Blocker
>
> Dear Igniters,
> We observe huge memory leak when data streamer used together with local cache.
> In the attached demo producer produces local cache with single binary object and passes this to queue. Consumer picks up the cache from the queue, constructs different binary object from it, adds it to global partitioned cache and destroys local cache.
> This design causes a significant leak - the whole heap it takes within minutes (no matter if this is 4G or 24G).
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)